This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5643cfb71d34 [SPARK-48303][CORE] Reorganize `LogKeys` 5643cfb71d34 is described below commit 5643cfb71d343133a185aa257f137074f41abfb3 Author: panbingkun <panbing...@baidu.com> AuthorDate: Thu May 16 23:20:23 2024 -0700 [SPARK-48303][CORE] Reorganize `LogKeys` ### What changes were proposed in this pull request? The pr aims to `reorganize` `LogKeys`, includes: - remove some unused `LogLeys` ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE DEFAULT_COMPACTION_INTERVAL DRIVER_LIBRARY_PATH_KEY EXISTING_JARS EXPECTED_ANSWER FILTERS HAS_R_PACKAGE JAR_ENTRY LOG_KEY_FILE NUM_ADDED_MASTERS NUM_ADDED_WORKERS NUM_PARTITION_VALUES OUTPUT_LINE OUTPUT_LINE_NUMBER PARTITIONS_SIZE RULE_BATCH_NAME SERIALIZE_OUTPUT_LENGTH SHELL_COMMAND STREAM_SOURCE - merge `PARAMETER` into `PARAM` (because some are `full` spelled, and some are `abbreviations`, which are not unified) ESTIMATOR_PARAMETER_MAP -> ESTIMATOR_PARAM_MAP FUNCTION_PARAMETER -> FUNCTION_PARAM METHOD_PARAMETER_TYPES -> METHOD_PARAM_TYPES - merge `NUMBER` into `NUM` (abbreviations) MIN_VERSION_NUMBER -> MIN_VERSION_NUM RULE_NUMBER_OF_RUNS -> NUM_RULE_OF_RUNS VERSION_NUMBER -> VERSION_NUM - merge `TOTAL` into `NUM` TOTAL_RECORDS_READ -> NUM_RECORDS_READ TRAIN_WORD_COUNT -> NUM_TRAIN_WORD - `NUM` as prefix CHECKSUM_FILE_NUM -> NUM_CHECKSUM_FILE DATA_FILE_NUM -> NUM_DATA_FILE INDEX_FILE_NUM -> NUM_INDEX_FILE - COUNR -> NUM EXECUTOR_DESIRED_COUNT -> NUM_EXECUTOR_DESIRED EXECUTOR_LAUNCH_COUNT -> NUM_EXECUTOR_LAUNCH EXECUTOR_TARGET_COUNT -> NUM_EXECUTOR_TARGET KAFKA_PULLS_COUNT -> NUM_KAFKA_PULLS KAFKA_RECORDS_PULLED_COUNT -> NUM_KAFKA_RECORDS_PULLED MIN_FREQUENT_PATTERN_COUNT -> MIN_NUM_FREQUENT_PATTERN POD_COUNT -> NUM_POD POD_SHARED_SLOT_COUNT -> NUM_POD_SHARED_SLOT POD_TARGET_COUNT -> NUM_POD_TARGET RETRY_COUNT -> NUM_RETRY - fix some `typo` MALFORMATTED_STIRNG -> MALFORMATTED_STRING - other MAX_LOG_NUM_POLICY -> MAX_NUM_LOG_POLICY WEIGHTED_NUM -> NUM_WEIGHTED_EXAMPLES Changes in other code are additional changes caused by the above adjustments. ### Why are the changes needed? Let's make `LogKeys` easier to understand and more consistent. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46612 from panbingkun/reorganize_logkey. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../network/shuffle/RetryingBlockTransferor.java | 6 +- .../scala/org/apache/spark/internal/LogKey.scala | 68 ++++++++-------------- .../sql/connect/client/GrpcRetryHandler.scala | 8 +-- .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 4 +- .../sql/kafka010/KafkaOffsetReaderConsumer.scala | 4 +- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 6 +- .../streaming/kinesis/KinesisBackedBlockRDD.scala | 4 +- .../org/apache/spark/api/r/RBackendHandler.scala | 4 +- .../spark/deploy/history/FsHistoryProvider.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala | 2 +- .../apache/spark/ml/tree/impl/RandomForest.scala | 4 +- .../apache/spark/ml/tuning/CrossValidator.scala | 4 +- .../spark/ml/tuning/TrainValidationSplit.scala | 4 +- .../org/apache/spark/mllib/feature/Word2Vec.scala | 4 +- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 4 +- .../apache/spark/mllib/linalg/VectorsSuite.scala | 4 +- .../cluster/k8s/ExecutorPodsAllocator.scala | 6 +- ...ernetesLocalDiskShuffleExecutorComponents.scala | 6 +- .../apache/spark/deploy/yarn/YarnAllocator.scala | 6 +- .../catalyst/expressions/V2ExpressionUtils.scala | 4 +- .../spark/sql/catalyst/rules/RuleExecutor.scala | 6 +- .../sql/execution/streaming/state/RocksDB.scala | 18 +++--- .../streaming/state/RocksDBFileManager.scala | 22 +++---- .../state/RocksDBStateStoreProvider.scala | 6 +- .../apache/hive/service/server/HiveServer2.java | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 2 +- .../org/apache/spark/streaming/Checkpoint.scala | 4 +- .../streaming/util/FileBasedWriteAheadLog.scala | 4 +- 28 files changed, 101 insertions(+), 117 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java index ca2073af87c1..83be2db5d0b7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java @@ -184,7 +184,7 @@ public class RetryingBlockTransferor { logger.error("Exception while beginning {} of {} outstanding blocks (after {} retries)", e, MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, blockIdsToTransfer.length), - MDC.of(LogKeys.RETRY_COUNT$.MODULE$, numRetries)); + MDC.of(LogKeys.NUM_RETRY$.MODULE$, numRetries)); } else { logger.error("Exception while beginning {} of {} outstanding blocks", e, MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), @@ -217,7 +217,7 @@ public class RetryingBlockTransferor { logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms", MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), - MDC.of(LogKeys.RETRY_COUNT$.MODULE$, retryCount), + MDC.of(LogKeys.NUM_RETRY$.MODULE$, retryCount), MDC.of(LogKeys.MAX_ATTEMPTS$.MODULE$, maxRetries), MDC.of(LogKeys.NUM_BLOCK_IDS$.MODULE$, outstandingBlocksIds.size()), MDC.of(LogKeys.RETRY_WAIT_TIME$.MODULE$, retryWaitTime)); @@ -313,7 +313,7 @@ public class RetryingBlockTransferor { logger.error("Failed to {} block {}, and will not retry ({} retries)", exception, MDC.of(LogKeys.TRANSFER_TYPE$.MODULE$, listener.getTransferType()), MDC.of(LogKeys.BLOCK_ID$.MODULE$, blockId), - MDC.of(LogKeys.RETRY_COUNT$.MODULE$,retryCount)); + MDC.of(LogKeys.NUM_RETRY$.MODULE$,retryCount)); } else { logger.debug( String.format("Failed to %s block %s, and will not retry (%s retries)", diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index e03987933306..1f67a211c01f 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -31,7 +31,6 @@ trait LogKey { */ object LogKeys { case object ACCUMULATOR_ID extends LogKey - case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey case object ADDED_JARS extends LogKey @@ -80,7 +79,6 @@ object LogKeys { case object CHECKPOINT_PATH extends LogKey case object CHECKPOINT_ROOT extends LogKey case object CHECKPOINT_TIME extends LogKey - case object CHECKSUM_FILE_NUM extends LogKey case object CHOSEN_WATERMARK extends LogKey case object CLASSIFIER extends LogKey case object CLASS_LOADER extends LogKey @@ -143,10 +141,8 @@ object LogKeys { case object DATAFRAME_CACHE_ENTRY extends LogKey case object DATAFRAME_ID extends LogKey case object DATA_FILE extends LogKey - case object DATA_FILE_NUM extends LogKey case object DATA_SOURCE extends LogKey case object DATA_SOURCES extends LogKey - case object DEFAULT_COMPACTION_INTERVAL extends LogKey case object DEFAULT_COMPACT_INTERVAL extends LogKey case object DEFAULT_ISOLATION_LEVEL extends LogKey case object DEFAULT_NAME extends LogKey @@ -164,7 +160,6 @@ object LogKeys { case object DIFF_DELTA extends LogKey case object DIVISIBLE_CLUSTER_INDICES_SIZE extends LogKey case object DRIVER_ID extends LogKey - case object DRIVER_LIBRARY_PATH_KEY extends LogKey case object DRIVER_MEMORY_SIZE extends LogKey case object DRIVER_STATE extends LogKey case object DROPPED_PARTITIONS extends LogKey @@ -179,7 +174,7 @@ object LogKeys { case object ENGINE extends LogKey case object EPOCH extends LogKey case object ERROR extends LogKey - case object ESTIMATOR_PARAMETER_MAP extends LogKey + case object ESTIMATOR_PARAM_MAP extends LogKey case object EVALUATED_FILTERS extends LogKey case object EVENT extends LogKey case object EVENT_LOG_DESTINATION extends LogKey @@ -192,25 +187,20 @@ object LogKeys { case object EXECUTION_MEMORY_SIZE extends LogKey case object EXECUTION_PLAN_LEAVES extends LogKey case object EXECUTOR_BACKEND extends LogKey - case object EXECUTOR_DESIRED_COUNT extends LogKey case object EXECUTOR_ENVS extends LogKey case object EXECUTOR_ENV_REGEX extends LogKey case object EXECUTOR_ID extends LogKey case object EXECUTOR_IDS extends LogKey case object EXECUTOR_LAUNCH_COMMANDS extends LogKey - case object EXECUTOR_LAUNCH_COUNT extends LogKey case object EXECUTOR_MEMORY_SIZE extends LogKey case object EXECUTOR_RESOURCES extends LogKey case object EXECUTOR_SHUFFLE_INFO extends LogKey case object EXECUTOR_STATE extends LogKey - case object EXECUTOR_TARGET_COUNT extends LogKey case object EXECUTOR_TIMEOUT extends LogKey case object EXEC_AMOUNT extends LogKey case object EXISTING_FILE extends LogKey - case object EXISTING_JARS extends LogKey case object EXISTING_PATH extends LogKey case object EXIT_CODE extends LogKey - case object EXPECTED_ANSWER extends LogKey case object EXPECTED_NUM_FILES extends LogKey case object EXPECTED_PARTITION_COLUMN extends LogKey case object EXPIRY_TIMESTAMP extends LogKey @@ -237,7 +227,6 @@ object LogKeys { case object FILE_SYSTEM extends LogKey case object FILE_VERSION extends LogKey case object FILTER extends LogKey - case object FILTERS extends LogKey case object FINAL_CONTEXT extends LogKey case object FINAL_OUTPUT_PATH extends LogKey case object FINAL_PATH extends LogKey @@ -245,7 +234,7 @@ object LogKeys { case object FROM_OFFSET extends LogKey case object FROM_TIME extends LogKey case object FUNCTION_NAME extends LogKey - case object FUNCTION_PARAMETER extends LogKey + case object FUNCTION_PARAM extends LogKey case object GLOBAL_INIT_FILE extends LogKey case object GLOBAL_WATERMARK extends LogKey case object GROUP_BY_EXPRS extends LogKey @@ -253,7 +242,6 @@ object LogKeys { case object HADOOP_VERSION extends LogKey case object HASH_JOIN_KEYS extends LogKey case object HASH_MAP_SIZE extends LogKey - case object HAS_R_PACKAGE extends LogKey case object HEARTBEAT extends LogKey case object HEARTBEAT_INTERVAL extends LogKey case object HISTORY_DIR extends LogKey @@ -270,7 +258,6 @@ object LogKeys { case object INCOMPATIBLE_TYPES extends LogKey case object INDEX extends LogKey case object INDEX_FILE extends LogKey - case object INDEX_FILE_NUM extends LogKey case object INDEX_NAME extends LogKey case object INFERENCE_MODE extends LogKey case object INIT extends LogKey @@ -282,7 +269,6 @@ object LogKeys { case object ISOLATION_LEVEL extends LogKey case object ISSUE_DATE extends LogKey case object IS_NETWORK_REQUEST_DONE extends LogKey - case object JAR_ENTRY extends LogKey case object JAR_MESSAGE extends LogKey case object JAR_URL extends LogKey case object JAVA_VERSION extends LogKey @@ -292,8 +278,6 @@ object LogKeys { case object JOIN_CONDITION_SUB_EXPR extends LogKey case object JOIN_TYPE extends LogKey case object K8S_CONTEXT extends LogKey - case object KAFKA_PULLS_COUNT extends LogKey - case object KAFKA_RECORDS_PULLED_COUNT extends LogKey case object KEY extends LogKey case object KEY2 extends LogKey case object KEYTAB extends LogKey @@ -320,11 +304,10 @@ object LogKeys { case object LOGICAL_PLAN_COLUMNS extends LogKey case object LOGICAL_PLAN_LEAVES extends LogKey case object LOG_ID extends LogKey - case object LOG_KEY_FILE extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey case object LOWER_BOUND extends LogKey - case object MALFORMATTED_STIRNG extends LogKey + case object MALFORMATTED_STRING extends LogKey case object MAP_ID extends LogKey case object MASTER_URL extends LogKey case object MAX_ATTEMPTS extends LogKey @@ -335,11 +318,11 @@ object LogKeys { case object MAX_EXECUTOR_FAILURES extends LogKey case object MAX_FILE_VERSION extends LogKey case object MAX_JVM_METHOD_PARAMS_LENGTH extends LogKey - case object MAX_LOG_NUM_POLICY extends LogKey case object MAX_MEMORY_SIZE extends LogKey case object MAX_METHOD_CODE_SIZE extends LogKey case object MAX_NUM_BINS extends LogKey case object MAX_NUM_CHUNKS extends LogKey + case object MAX_NUM_LOG_POLICY extends LogKey case object MAX_NUM_PARTITIONS extends LogKey case object MAX_NUM_POSSIBLE_BINS extends LogKey case object MAX_NUM_ROWS_IN_MEMORY_BUFFER extends LogKey @@ -357,17 +340,17 @@ object LogKeys { case object METADATA_JSON extends LogKey case object META_FILE extends LogKey case object METHOD_NAME extends LogKey - case object METHOD_PARAMETER_TYPES extends LogKey + case object METHOD_PARAM_TYPES extends LogKey case object METRICS_JSON extends LogKey case object METRIC_NAME extends LogKey case object MINI_BATCH_FRACTION extends LogKey case object MIN_COMPACTION_BATCH_ID extends LogKey - case object MIN_FREQUENT_PATTERN_COUNT extends LogKey + case object MIN_NUM_FREQUENT_PATTERN extends LogKey case object MIN_POINT_PER_CLUSTER extends LogKey case object MIN_SHARE extends LogKey case object MIN_SIZE extends LogKey case object MIN_TIME extends LogKey - case object MIN_VERSION_NUMBER extends LogKey + case object MIN_VERSION_NUM extends LogKey case object MODEL_WEIGHTS extends LogKey case object MODULE_NAME extends LogKey case object NAMESPACE extends LogKey @@ -383,9 +366,7 @@ object LogKeys { case object NODE_LOCATION extends LogKey case object NON_BUILT_IN_CONNECTORS extends LogKey case object NORM extends LogKey - case object NUM_ADDED_MASTERS extends LogKey case object NUM_ADDED_PARTITIONS extends LogKey - case object NUM_ADDED_WORKERS extends LogKey case object NUM_APPS extends LogKey case object NUM_BIN extends LogKey case object NUM_BLOCK_IDS extends LogKey @@ -398,21 +379,27 @@ object LogKeys { case object NUM_BYTES_TO_WARN extends LogKey case object NUM_BYTES_USED extends LogKey case object NUM_CATEGORIES extends LogKey + case object NUM_CHECKSUM_FILE extends LogKey case object NUM_CHUNKS extends LogKey case object NUM_CLASSES extends LogKey case object NUM_COEFFICIENTS extends LogKey case object NUM_COLUMNS extends LogKey case object NUM_CONCURRENT_WRITER extends LogKey case object NUM_CORES extends LogKey + case object NUM_DATA_FILE extends LogKey case object NUM_DATA_FILES extends LogKey case object NUM_DRIVERS extends LogKey case object NUM_DROPPED_PARTITIONS extends LogKey + case object NUM_EFFECTIVE_RULE_OF_RUNS extends LogKey case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey case object NUM_EVENTS extends LogKey case object NUM_EXAMPLES extends LogKey case object NUM_EXECUTOR_CORES extends LogKey case object NUM_EXECUTOR_CORES_REMAINING extends LogKey case object NUM_EXECUTOR_CORES_TOTAL extends LogKey + case object NUM_EXECUTOR_DESIRED extends LogKey + case object NUM_EXECUTOR_LAUNCH extends LogKey + case object NUM_EXECUTOR_TARGET extends LogKey case object NUM_FAILURES extends LogKey case object NUM_FEATURES extends LogKey case object NUM_FILES extends LogKey @@ -420,8 +407,11 @@ object LogKeys { case object NUM_FILES_FAILED_TO_DELETE extends LogKey case object NUM_FILES_REUSED extends LogKey case object NUM_FREQUENT_ITEMS extends LogKey + case object NUM_INDEX_FILE extends LogKey case object NUM_INDEX_FILES extends LogKey case object NUM_ITERATIONS extends LogKey + case object NUM_KAFKA_PULLS extends LogKey + case object NUM_KAFKA_RECORDS_PULLED extends LogKey case object NUM_LEADING_SINGULAR_VALUES extends LogKey case object NUM_LEFT_PARTITION_VALUES extends LogKey case object NUM_LOADED_ENTRIES extends LogKey @@ -432,23 +422,28 @@ object LogKeys { case object NUM_NODES extends LogKey case object NUM_PARTITIONS extends LogKey case object NUM_PARTITIONS2 extends LogKey - case object NUM_PARTITION_VALUES extends LogKey case object NUM_PATHS extends LogKey case object NUM_PEERS extends LogKey case object NUM_PEERS_REPLICATED_TO extends LogKey case object NUM_PEERS_TO_REPLICATE_TO extends LogKey case object NUM_PENDING_LAUNCH_TASKS extends LogKey + case object NUM_POD extends LogKey + case object NUM_POD_SHARED_SLOT extends LogKey + case object NUM_POD_TARGET extends LogKey case object NUM_POINT extends LogKey case object NUM_PREFIXES extends LogKey case object NUM_PRUNED extends LogKey + case object NUM_RECORDS_READ extends LogKey case object NUM_REMOVED_WORKERS extends LogKey case object NUM_REPLICAS extends LogKey case object NUM_REQUESTS extends LogKey case object NUM_REQUEST_SYNC_TASK extends LogKey case object NUM_RESOURCE_SLOTS extends LogKey case object NUM_RETRIES extends LogKey + case object NUM_RETRY extends LogKey case object NUM_RIGHT_PARTITION_VALUES extends LogKey case object NUM_ROWS extends LogKey + case object NUM_RULE_OF_RUNS extends LogKey case object NUM_SEQUENCES extends LogKey case object NUM_SLOTS extends LogKey case object NUM_SPILL_INFOS extends LogKey @@ -456,7 +451,9 @@ object LogKeys { case object NUM_SUB_DIRS extends LogKey case object NUM_TASKS extends LogKey case object NUM_TASK_CPUS extends LogKey + case object NUM_TRAIN_WORD extends LogKey case object NUM_VERSIONS_RETAIN extends LogKey + case object NUM_WEIGHTED_EXAMPLES extends LogKey case object OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD extends LogKey case object OBJECT_ID extends LogKey case object OFFSET extends LogKey @@ -477,14 +474,11 @@ object LogKeys { case object OS_NAME extends LogKey case object OS_VERSION extends LogKey case object OUTPUT extends LogKey - case object OUTPUT_LINE extends LogKey - case object OUTPUT_LINE_NUMBER extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PAGE_SIZE extends LogKey case object PARSE_MODE extends LogKey case object PARTITIONED_FILE_READER extends LogKey case object PARTITIONER extends LogKey - case object PARTITIONS_SIZE extends LogKey case object PARTITION_ID extends LogKey case object PARTITION_SPECIFICATION extends LogKey case object PARTITION_SPECS extends LogKey @@ -494,14 +488,11 @@ object LogKeys { case object PERCENT extends LogKey case object PIPELINE_STAGE_UID extends LogKey case object PLUGIN_NAME extends LogKey - case object POD_COUNT extends LogKey case object POD_ID extends LogKey case object POD_NAME extends LogKey case object POD_NAMESPACE extends LogKey case object POD_PHASE extends LogKey - case object POD_SHARED_SLOT_COUNT extends LogKey case object POD_STATE extends LogKey - case object POD_TARGET_COUNT extends LogKey case object POINT_OF_CENTER extends LogKey case object POLICY extends LogKey case object POOL_NAME extends LogKey @@ -569,7 +560,6 @@ object LogKeys { case object RESULT extends LogKey case object RESULT_SIZE_BYTES extends LogKey case object RESULT_SIZE_BYTES_MAX extends LogKey - case object RETRY_COUNT extends LogKey case object RETRY_INTERVAL extends LogKey case object RETRY_WAIT_TIME extends LogKey case object RIGHT_EXPR extends LogKey @@ -580,15 +570,12 @@ object LogKeys { case object RPC_ADDRESS extends LogKey case object RPC_ENDPOINT_REF extends LogKey case object RPC_MESSAGE_CAPACITY extends LogKey - case object RULE_BATCH_NAME extends LogKey case object RULE_NAME extends LogKey - case object RULE_NUMBER_OF_RUNS extends LogKey case object RUN_ID extends LogKey case object SCALA_VERSION extends LogKey case object SCHEDULER_POOL_NAME extends LogKey case object SCHEMA extends LogKey case object SCHEMA2 extends LogKey - case object SERIALIZE_OUTPUT_LENGTH extends LogKey case object SERVER_NAME extends LogKey case object SERVICE_NAME extends LogKey case object SERVLET_CONTEXT_HANDLER_PATH extends LogKey @@ -598,7 +585,6 @@ object LogKeys { case object SESSION_KEY extends LogKey case object SET_CLIENT_INFO_REQUEST extends LogKey case object SHARD_ID extends LogKey - case object SHELL_COMMAND extends LogKey case object SHELL_OPTIONS extends LogKey case object SHORT_USER_NAME extends LogKey case object SHUFFLE_BLOCK_INFO extends LogKey @@ -656,7 +642,6 @@ object LogKeys { case object STREAM_CHUNK_ID extends LogKey case object STREAM_ID extends LogKey case object STREAM_NAME extends LogKey - case object STREAM_SOURCE extends LogKey case object SUBMISSION_ID extends LogKey case object SUBSAMPLING_RATE extends LogKey case object SUB_QUERY extends LogKey @@ -701,14 +686,12 @@ object LogKeys { case object TOPIC_PARTITION_OFFSET_RANGE extends LogKey case object TOTAL extends LogKey case object TOTAL_EFFECTIVE_TIME extends LogKey - case object TOTAL_RECORDS_READ extends LogKey case object TOTAL_TIME extends LogKey case object TOTAL_TIME_READ extends LogKey case object TO_TIME extends LogKey case object TRAINING_SIZE extends LogKey case object TRAIN_VALIDATION_SPLIT_METRIC extends LogKey case object TRAIN_VALIDATION_SPLIT_METRICS extends LogKey - case object TRAIN_WORD_COUNT extends LogKey case object TRANSFER_TYPE extends LogKey case object TREE_NODE extends LogKey case object TRIGGER_INTERVAL extends LogKey @@ -728,7 +711,7 @@ object LogKeys { case object USER_NAME extends LogKey case object UUID extends LogKey case object VALUE extends LogKey - case object VERSION_NUMBER extends LogKey + case object VERSION_NUM extends LogKey case object VIRTUAL_CORES extends LogKey case object VOCAB_SIZE extends LogKey case object WAIT_RESULT_TIME extends LogKey @@ -736,7 +719,6 @@ object LogKeys { case object WATERMARK_CONSTRAINT extends LogKey case object WEB_URL extends LogKey case object WEIGHT extends LogKey - case object WEIGHTED_NUM extends LogKey case object WORKER extends LogKey case object WORKER_HOST extends LogKey case object WORKER_ID extends LogKey diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala index f5bd504aba5c..7e0a356b9e49 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala @@ -22,7 +22,7 @@ import scala.util.control.NonFatal import io.grpc.stub.StreamObserver import org.apache.spark.internal.Logging -import org.apache.spark.internal.LogKeys.{ERROR, POLICY, RETRY_COUNT, RETRY_WAIT_TIME} +import org.apache.spark.internal.LogKeys.{ERROR, NUM_RETRY, POLICY, RETRY_WAIT_TIME} import org.apache.spark.internal.MDC private[sql] class GrpcRetryHandler( @@ -190,7 +190,7 @@ private[sql] object GrpcRetryHandler extends Logging { // retry exception is considered immediately retriable without any policies. logWarning( log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + - log"retrying (currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)})") + log"retrying (currentRetryNum=${MDC(NUM_RETRY, currentRetryNum)})") return } @@ -201,7 +201,7 @@ private[sql] object GrpcRetryHandler extends Logging { logWarning( log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + log"retrying (wait=${MDC(RETRY_WAIT_TIME, time.get.toMillis)} ms, " + - log"currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)}, " + + log"currentRetryNum=${MDC(NUM_RETRY, currentRetryNum)}, " + log"policy=${MDC(POLICY, policy.getName)}).") sleep(time.get.toMillis) return @@ -210,7 +210,7 @@ private[sql] object GrpcRetryHandler extends Logging { logWarning( log"Non-Fatal error during RPC execution: ${MDC(ERROR, lastException)}, " + - log"exceeded retries (currentRetryNum=${MDC(RETRY_COUNT, currentRetryNum)})") + log"exceeded retries (currentRetryNum=${MDC(NUM_RETRY, currentRetryNum)})") val error = new RetriesExceeded() exceptionList.foreach(error.addSuppressed) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 39c842216cf0..9ac06a41a068 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -31,7 +31,7 @@ import org.apache.kafka.common.requests.OffsetFetchResponse import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{OFFSETS, RETRY_COUNT, TOPIC_PARTITION_OFFSET} +import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -536,7 +536,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( case NonFatal(e) => lastException = e logWarning( - log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka offsets: ", e) + log"Error in attempt ${MDC(NUM_RETRY, attempt)} getting Kafka offsets: ", e) attempt += 1 Thread.sleep(offsetFetchAttemptIntervalMs) resetAdmin() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index 17db894f504b..eceedbee1541 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkEnv import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{OFFSETS, RETRY_COUNT, TOPIC_PARTITION_OFFSET} +import org.apache.spark.internal.LogKeys.{NUM_RETRY, OFFSETS, TOPIC_PARTITION_OFFSET} import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.kafka010.KafkaSourceProvider.StrategyOnNoMatchStartingOffset @@ -613,7 +613,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( case NonFatal(e) => lastException = e logWarning( - log"Error in attempt ${MDC(RETRY_COUNT, attempt)} getting Kafka offsets: ", e) + log"Error in attempt ${MDC(NUM_RETRY, attempt)} getting Kafka offsets: ", e) attempt += 1 Thread.sleep(offsetFetchAttemptIntervalMs) resetConsumer() diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 836c0fb13d4e..ceb9d96660ae 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -393,9 +393,9 @@ private[kafka010] class KafkaDataConsumer( val walTime = System.nanoTime() - startTimestampNano logInfo(log"From Kafka ${MDC(CONSUMER, kafkaMeta)} read " + - log"${MDC(TOTAL_RECORDS_READ, totalRecordsRead)} records through " + - log"${MDC(KAFKA_PULLS_COUNT, numPolls)} polls " + - log"(polled out ${MDC(KAFKA_RECORDS_PULLED_COUNT, numRecordsPolled)} records), " + + log"${MDC(NUM_RECORDS_READ, totalRecordsRead)} records through " + + log"${MDC(NUM_KAFKA_PULLS, numPolls)} polls " + + log"(polled out ${MDC(NUM_KAFKA_RECORDS_PULLED, numRecordsPolled)} records), " + log"taking ${MDC(TOTAL_TIME_READ, totalTimeReadNanos / NANOS_PER_MILLIS.toDouble)} ms, " + log"during time span of ${MDC(TIME, walTime / NANOS_PER_MILLIS.toDouble)} ms." ) diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 5a2f904f2c65..b391203b4b96 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -30,7 +30,7 @@ import com.amazonaws.services.kinesis.model._ import org.apache.spark._ import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{ERROR, RETRY_COUNT} +import org.apache.spark.internal.LogKeys.{ERROR, NUM_RETRY} import org.apache.spark.rdd.{BlockRDD, BlockRDDPartition} import org.apache.spark.storage.BlockId import org.apache.spark.util.NextIterator @@ -279,7 +279,7 @@ class KinesisSequenceRangeIterator( t match { case ptee: ProvisionedThroughputExceededException => logWarning(log"Error while ${MDC(ERROR, message)} " + - log"[attempt = ${MDC(RETRY_COUNT, retryCount + 1)}]", ptee) + log"[attempt = ${MDC(NUM_RETRY, retryCount + 1)}]", ptee) case e: Throwable => throw new SparkException(s"Error while $message", e) } diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 45ac90626468..c3d01ec47458 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -159,7 +159,7 @@ private[r] class RBackendHandler(server: RBackend) log"${MDC(CLASS_NAME, cls)}.${MDC(METHOD_NAME, methodName)}. Candidates are:") selectedMethods.foreach { method => logWarning(log"${MDC(METHOD_NAME, methodName)}(" + - log"${MDC(METHOD_PARAMETER_TYPES, method.getParameterTypes.mkString(","))})") + log"${MDC(METHOD_PARAM_TYPES, method.getParameterTypes.mkString(","))})") } throw new Exception(s"No matched method found for $cls.$methodName") } @@ -181,7 +181,7 @@ private[r] class RBackendHandler(server: RBackend) + log"Candidates are:") ctors.foreach { ctor => logWarning(log"${MDC(CLASS_NAME, cls)}(" + - log"${MDC(METHOD_PARAMETER_TYPES, ctor.getParameterTypes.mkString(","))})") + log"${MDC(METHOD_PARAM_TYPES, ctor.getParameterTypes.mkString(","))})") } throw new Exception(s"No matched constructor found for $cls") } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 53b239ddfd79..bfd96167b169 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -1015,7 +1015,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } if (count > 0) { logWarning(log"Fail to clean up according to MAX_LOG_NUM policy " + - log"(${MDC(MAX_LOG_NUM_POLICY, maxNum)}).") + log"(${MDC(MAX_NUM_LOG_POLICY, maxNum)}).") } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d9f86d0d0262..84e67cba33a9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -575,7 +575,7 @@ private[deploy] class Master( if (!execs.exists(_.state == ExecutorState.RUNNING)) { logError(log"Application ${MDC(LogKeys.APP_DESC, appInfo.desc.name)} " + log"with ID ${MDC(LogKeys.APP_ID, appInfo.id)} " + - log"failed ${MDC(LogKeys.RETRY_COUNT, appInfo.retryCount)} times; removing it") + log"failed ${MDC(LogKeys.NUM_RETRY, appInfo.retryCount)} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 3461c5218f39..452532df5a2b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{MAX_MEMORY_SIZE, MEMORY_SIZE, NUM_CLASSES, NUM_EXAMPLES, NUM_FEATURES, NUM_NODES, TIMER, WEIGHTED_NUM} +import org.apache.spark.internal.LogKeys.{MAX_MEMORY_SIZE, MEMORY_SIZE, NUM_CLASSES, NUM_EXAMPLES, NUM_FEATURES, NUM_NODES, NUM_WEIGHTED_EXAMPLES, TIMER} import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.impl.Utils @@ -136,7 +136,7 @@ private[spark] object RandomForest extends Logging with Serializable { logInfo(log"numClasses: ${MDC(NUM_CLASSES, metadata.numClasses)}") logInfo(log"numExamples: ${MDC(NUM_EXAMPLES, metadata.numExamples)}") logInfo(log"weightedNumExamples: " + - log"${MDC(WEIGHTED_NUM, metadata.weightedNumExamples)}") + log"${MDC(NUM_WEIGHTED_EXAMPLES, metadata.weightedNumExamples)}") } timer.start("init") diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 55b78d4bf3bb..867f35a5d2b8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -28,7 +28,7 @@ import org.json4s.DefaultFormats import org.apache.spark.annotation.Since import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{CROSS_VALIDATION_METRIC, CROSS_VALIDATION_METRICS, ESTIMATOR_PARAMETER_MAP} +import org.apache.spark.internal.LogKeys.{CROSS_VALIDATION_METRIC, CROSS_VALIDATION_METRICS, ESTIMATOR_PARAM_MAP} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators} @@ -198,7 +198,7 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String) val (bestMetric, bestIndex) = if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1) else metrics.zipWithIndex.minBy(_._1) - instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAMETER_MAP, epm(bestIndex))}") + instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAM_MAP, epm(bestIndex))}") instr.logInfo(log"Best cross-validation metric: ${MDC(CROSS_VALIDATION_METRIC, bestMetric)}.") val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]] copyValues(new CrossValidatorModel(uid, bestModel, metrics) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index fa4bcc32de95..8e33ae6aad28 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -29,7 +29,7 @@ import org.json4s.DefaultFormats import org.apache.spark.annotation.Since import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{ESTIMATOR_PARAMETER_MAP, TRAIN_VALIDATION_SPLIT_METRIC, TRAIN_VALIDATION_SPLIT_METRICS} +import org.apache.spark.internal.LogKeys.{ESTIMATOR_PARAM_MAP, TRAIN_VALIDATION_SPLIT_METRIC, TRAIN_VALIDATION_SPLIT_METRICS} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.evaluation.Evaluator import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators} @@ -174,7 +174,7 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St val (bestMetric, bestIndex) = if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1) else metrics.zipWithIndex.minBy(_._1) - instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAMETER_MAP, epm(bestIndex))}") + instr.logInfo(log"Best set of parameters:\n${MDC(ESTIMATOR_PARAM_MAP, epm(bestIndex))}") instr.logInfo(log"Best train validation split metric: " + log"${MDC(TRAIN_VALIDATION_SPLIT_METRIC, bestMetric)}.") val bestModel = est.fit(dataset, epm(bestIndex)).asInstanceOf[Model[_]] diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index d2fc78a7ff7f..499dc09b8621 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -32,7 +32,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{ALPHA, COUNT, TRAIN_WORD_COUNT, VOCAB_SIZE} +import org.apache.spark.internal.LogKeys.{ALPHA, COUNT, NUM_TRAIN_WORD, VOCAB_SIZE} import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE import org.apache.spark.ml.linalg.BLAS import org.apache.spark.mllib.linalg.{Vector, Vectors} @@ -210,7 +210,7 @@ class Word2Vec extends Serializable with Logging { a += 1 } logInfo(log"vocabSize = ${MDC(VOCAB_SIZE, vocabSize)}," + - log" trainWordsCount = ${MDC(TRAIN_WORD_COUNT, trainWordsCount)}") + log" trainWordsCount = ${MDC(NUM_TRAIN_WORD, trainWordsCount)}") } private def createExpTable(): Array[Float] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 539e93867bc2..3c648f34c610 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -34,7 +34,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{MIN_FREQUENT_PATTERN_COUNT, NUM_FREQUENT_ITEMS, NUM_LOCAL_FREQUENT_PATTERN, NUM_PREFIXES, NUM_SEQUENCES} +import org.apache.spark.internal.LogKeys.{MIN_NUM_FREQUENT_PATTERN, NUM_FREQUENT_ITEMS, NUM_LOCAL_FREQUENT_PATTERN, NUM_PREFIXES, NUM_SEQUENCES} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -142,7 +142,7 @@ class PrefixSpan private ( val totalCount = data.count() logInfo(log"number of sequences: ${MDC(NUM_SEQUENCES, totalCount)}") val minCount = math.ceil(minSupport * totalCount).toLong - logInfo(log"minimum count for a frequent pattern: ${MDC(MIN_FREQUENT_PATTERN_COUNT, minCount)}") + logInfo(log"minimum count for a frequent pattern: ${MDC(MIN_NUM_FREQUENT_PATTERN, minCount)}") // Find frequent items. val freqItems = findFrequentItems(data, minCount) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala index 6301638c9a46..135d7e26c6d8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala @@ -25,7 +25,7 @@ import breeze.linalg.{DenseMatrix => BDM} import org.json4s.jackson.JsonMethods.{parse => parseJson} import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} -import org.apache.spark.internal.LogKeys.MALFORMATTED_STIRNG +import org.apache.spark.internal.LogKeys.MALFORMATTED_STRING import org.apache.spark.internal.MDC import org.apache.spark.internal.config.Kryo._ import org.apache.spark.ml.{linalg => newlinalg} @@ -228,7 +228,7 @@ class VectorsSuite extends SparkFunSuite { malformatted.foreach { s => intercept[SparkException] { Vectors.parse(s) - logInfo(log"Didn't detect malformatted string ${MDC(MALFORMATTED_STIRNG, s)}.") + logInfo(log"Didn't detect malformatted string ${MDC(MALFORMATTED_STRING, s)}.") } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 38303b314082..ef3547fd389f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -400,9 +400,9 @@ class ExecutorPodsAllocator( math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods) logInfo(log"Going to request ${MDC(LogKeys.COUNT, numExecutorsToAllocate)} executors from" + log" Kubernetes for ResourceProfile Id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " + - log"target: ${MDC(LogKeys.POD_TARGET_COUNT, targetNum)}, " + - log"known: ${MDC(LogKeys.POD_COUNT, podCountForRpId)}, sharedSlotFromPendingPods: " + - log"${MDC(LogKeys.POD_SHARED_SLOT_COUNT, sharedSlotFromPendingPods)}.") + log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " + + log"known: ${MDC(LogKeys.NUM_POD, podCountForRpId)}, sharedSlotFromPendingPods: " + + log"${MDC(LogKeys.NUM_POD_SHARED_SLOT, sharedSlotFromPendingPods)}.") requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala index 7a6fdef7411b..2728385874f6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala @@ -95,9 +95,9 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging { .partition(_.getName.contains(".checksum")) val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index")) - logInfo(log"Found ${MDC(LogKeys.DATA_FILE_NUM, dataFiles.length)} data files, " + - log"${MDC(LogKeys.INDEX_FILE_NUM, indexFiles.length)} index files, " + - log"and ${MDC(LogKeys.CHECKSUM_FILE_NUM, checksumFiles.length)} checksum files.") + logInfo(log"Found ${MDC(LogKeys.NUM_DATA_FILE, dataFiles.length)} data files, " + + log"${MDC(LogKeys.NUM_INDEX_FILE, indexFiles.length)} index files, " + + log"and ${MDC(LogKeys.NUM_CHECKSUM_FILE, checksumFiles.length)} checksum files.") // Build a hashmap with checksum file name as a key val checksumFileMap = new mutable.HashMap[String, File]() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 29cf8ccc20f9..c86195d0ef31 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -599,7 +599,7 @@ private[yarn] class YarnAllocator( val numToCancel = math.min(numPendingAllocate, -missing) logInfo(log"Canceling requests for ${MDC(LogKeys.COUNT, numToCancel)} executor " + log"container(s) to have a new desired total " + - log"${MDC(LogKeys.EXECUTOR_DESIRED_COUNT, + log"${MDC(LogKeys.NUM_EXECUTOR_DESIRED, getOrUpdateTargetNumExecutorsForRPId(rpId))} executors.") // cancel pending allocate requests by taking locality preference into account val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel) @@ -707,7 +707,7 @@ private[yarn] class YarnAllocator( runAllocatedContainers(containersToUse) logInfo(log"Received ${MDC(LogKeys.COUNT, allocatedContainers.size)} containers from YARN, " + - log"launching executors on ${MDC(LogKeys.EXECUTOR_LAUNCH_COUNT, containersToUse.size)} " + + log"launching executors on ${MDC(LogKeys.NUM_EXECUTOR_LAUNCH, containersToUse.size)} " + log"of them.") } @@ -819,7 +819,7 @@ private[yarn] class YarnAllocator( } else { logInfo(log"Skip launching executorRunnable as running executors count: " + log"${MDC(LogKeys.COUNT, rpRunningExecs)} reached target executors count: " + - log"${MDC(LogKeys.EXECUTOR_TARGET_COUNT, getOrUpdateTargetNumExecutorsForRPId(rpId))}.") + log"${MDC(LogKeys.NUM_EXECUTOR_TARGET, getOrUpdateTargetNumExecutorsForRPId(rpId))}.") } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index ea43bebe8ecb..220920a5a319 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.lang.reflect.{Method, Modifier} import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{FUNCTION_NAME, FUNCTION_PARAMETER} +import org.apache.spark.internal.LogKeys.{FUNCTION_NAME, FUNCTION_PARAM} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException @@ -136,7 +136,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { case _: NoSuchFunctionException => val parameterString = args.map(_.dataType.typeName).mkString("(", ", ", ")") logWarning(log"V2 function ${MDC(FUNCTION_NAME, name)} " + - log"with parameter types ${MDC(FUNCTION_PARAMETER, parameterString)} is used in " + + log"with parameter types ${MDC(FUNCTION_PARAM, parameterString)} is used in " + log"partition transforms, but its definition couldn't be found in the function catalog " + log"provided") None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index c3c105995cd8..0aa01e4f5c51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -90,14 +90,16 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { def logMetrics(metrics: QueryExecutionMetrics): Unit = { val totalTime = metrics.time / NANOS_PER_MILLIS.toDouble val totalTimeEffective = metrics.timeEffective / NANOS_PER_MILLIS.toDouble + // scalastyle:off line.size.limit val message: MessageWithContext = log""" |=== Metrics of Executed Rules === - |Total number of runs: ${MDC(RULE_NUMBER_OF_RUNS, metrics.numRuns)} + |Total number of runs: ${MDC(NUM_RULE_OF_RUNS, metrics.numRuns)} |Total time: ${MDC(TOTAL_TIME, totalTime)} ms - |Total number of effective runs: ${MDC(RULE_NUMBER_OF_RUNS, metrics.numEffectiveRuns)} + |Total number of effective runs: ${MDC(NUM_EFFECTIVE_RULE_OF_RUNS, metrics.numEffectiveRuns)} |Total time of effective runs: ${MDC(TOTAL_EFFECTIVE_TIME, totalTimeEffective)} ms """.stripMargin + // scalastyle:on line.size.limit logBasedOnLevel(message) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 151695192281..088242b4246e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -177,7 +177,7 @@ class RocksDB( assert(version >= 0) acquire(LoadStore) recordedMetrics = None - logInfo(log"Loading ${MDC(LogKeys.VERSION_NUMBER, version)}") + logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}") try { if (loadedVersion != version) { closeDB() @@ -212,7 +212,7 @@ class RocksDB( if (conf.resetStatsOnLoad) { nativeStats.reset } - logInfo(log"Loaded ${MDC(LogKeys.VERSION_NUMBER, version)}") + logInfo(log"Loaded ${MDC(LogKeys.VERSION_NUM, version)}") } catch { case t: Throwable => loadedVersion = -1 // invalidate loaded data @@ -548,7 +548,7 @@ class RocksDB( val newVersion = loadedVersion + 1 try { - logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUMBER, newVersion)}") + logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}") var compactTimeMs = 0L var flushTimeMs = 0L @@ -556,7 +556,7 @@ class RocksDB( if (shouldCreateSnapshot()) { // Need to flush the change to disk before creating a checkpoint // because rocksdb wal is disabled. - logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUMBER, newVersion)}") + logInfo(log"Flushing updates for ${MDC(LogKeys.VERSION_NUM, newVersion)}") flushTimeMs = timeTakenMs { // Flush updates to all available column families assert(!colFamilyNameToHandleMap.isEmpty) @@ -574,7 +574,7 @@ class RocksDB( checkpointTimeMs = timeTakenMs { val checkpointDir = createTempDir("checkpoint") - logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUMBER, newVersion)} " + + logInfo(log"Creating checkpoint for ${MDC(LogKeys.VERSION_NUM, newVersion)} " + log"in ${MDC(LogKeys.PATH, checkpointDir)}") // Make sure the directory does not exist. Native RocksDB fails if the directory to // checkpoint exists. @@ -595,7 +595,7 @@ class RocksDB( } } - logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUMBER, newVersion)} to DFS") + logInfo(log"Syncing checkpoint for ${MDC(LogKeys.VERSION_NUM, newVersion)} to DFS") val fileSyncTimeMs = timeTakenMs { if (enableChangelogCheckpointing) { try { @@ -619,7 +619,7 @@ class RocksDB( "fileSync" -> fileSyncTimeMs ) recordedMetrics = Some(metrics) - logInfo(log"Committed ${MDC(LogKeys.VERSION_NUMBER, newVersion)}, " + + logInfo(log"Committed ${MDC(LogKeys.VERSION_NUM, newVersion)}, " + log"stats = ${MDC(LogKeys.METRICS_JSON, recordedMetrics.get.json)}") loadedVersion } catch { @@ -656,7 +656,7 @@ class RocksDB( fileManagerMetrics = fileManager.latestSaveCheckpointMetrics } logInfo(log"${MDC(LogKeys.LOG_ID, loggingId)}: Upload snapshot of version " + - log"${MDC(LogKeys.VERSION_NUMBER, version)}," + + log"${MDC(LogKeys.VERSION_NUM, version)}," + log" time taken: ${MDC(LogKeys.TIME_UNITS, uploadTime)} ms") } finally { localCheckpoint.foreach(_.close()) @@ -676,7 +676,7 @@ class RocksDB( // Make sure changelogWriter gets recreated next time. changelogWriter = None release(RollbackStore) - logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUMBER, loadedVersion)}") + logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUM, loadedVersion)}") } def doMaintenance(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index d8b099a56312..0a460ece2400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -206,13 +206,13 @@ class RocksDBFileManager( /** Save all the files in given local checkpoint directory as a committed version in DFS */ def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { logFilesInDir(checkpointDir, log"Saving checkpoint files " + - log"for version ${MDC(LogKeys.VERSION_NUMBER, version)}") + log"for version ${MDC(LogKeys.VERSION_NUM, version)}") val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles) val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) val metadataFile = localMetadataFile(checkpointDir) metadata.writeToFile(metadataFile) - logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUMBER, version)}:\n" + + logInfo(log"Written metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}") if (version <= 1 && numKeys <= 0) { @@ -229,7 +229,7 @@ class RocksDBFileManager( } } zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version)) - logInfo(log"Saved checkpoint file for version ${MDC(LogKeys.VERSION_NUMBER, version)}") + logInfo(log"Saved checkpoint file for version ${MDC(LogKeys.VERSION_NUM, version)}") } /** @@ -239,7 +239,7 @@ class RocksDBFileManager( * local directory. */ def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { - logInfo(log"Loading checkpoint files for version ${MDC(LogKeys.VERSION_NUMBER, version)}") + logInfo(log"Loading checkpoint files for version ${MDC(LogKeys.VERSION_NUM, version)}") // The unique ids of SST files are checked when opening a rocksdb instance. The SST files // in larger versions can't be reused even if they have the same size and name because // they belong to another rocksdb instance. @@ -257,7 +257,7 @@ class RocksDBFileManager( // Copy the necessary immutable files val metadataFile = localMetadataFile(localDir) val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) - logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUMBER, version)}:\n" + + logInfo(log"Read metadata for version ${MDC(LogKeys.VERSION_NUM, version)}:\n" + log"${MDC(LogKeys.METADATA_JSON, metadata.prettyJson)}") loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) versionToRocksDBFiles.put(version, metadata.immutableFiles) @@ -265,7 +265,7 @@ class RocksDBFileManager( metadata } logFilesInDir(localDir, log"Loaded checkpoint files " + - log"for version ${MDC(LogKeys.VERSION_NUMBER, version)}") + log"for version ${MDC(LogKeys.VERSION_NUM, version)}") metadata } @@ -345,7 +345,7 @@ class RocksDBFileManager( versionsToDelete.foreach { version => try { fm.delete(dfsChangelogFile(version)) - logInfo(log"Deleted changelog file ${MDC(LogKeys.VERSION_NUMBER, version)}") + logInfo(log"Deleted changelog file ${MDC(LogKeys.VERSION_NUM, version)}") } catch { case e: Exception => logWarning( @@ -438,7 +438,7 @@ class RocksDBFileManager( filesToDelete ++= findOrphanFiles(fileToMaxUsedVersion.keys.toSeq, allSstFiles ++ allLogFiles) .map(_ -> -1L) logInfo(log"Deleting ${MDC(LogKeys.NUM_FILES, filesToDelete.size)} " + - log"files not used in versions >= ${MDC(LogKeys.VERSION_NUMBER, minVersionToRetain)}") + log"files not used in versions >= ${MDC(LogKeys.VERSION_NUM, minVersionToRetain)}") var failedToDelete = 0 filesToDelete.foreach { case (dfsFileName, maxUsedVersion) => try { @@ -477,7 +477,7 @@ class RocksDBFileManager( logInfo(log"Deleted ${MDC(LogKeys.NUM_FILES, filesToDelete.size - failedToDelete)} files " + log"(failed to delete" + log"${MDC(LogKeys.NUM_FILES_FAILED_TO_DELETE, failedToDelete)} files) " + - log"not used in versions >= ${MDC(LogKeys.MIN_VERSION_NUMBER, minVersionToRetain)}") + log"not used in versions >= ${MDC(LogKeys.MIN_VERSION_NUM, minVersionToRetain)}") val changelogVersionsToDelete = changelogFiles .map(_.getName.stripSuffix(".changelog")).map(_.toLong) .filter(_ < minVersionToRetain) @@ -490,7 +490,7 @@ class RocksDBFileManager( localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version - logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUMBER, version)}") + logInfo(log"Saving RocksDB files to DFS for ${MDC(LogKeys.VERSION_NUM, version)}") var bytesCopied = 0L var filesCopied = 0L @@ -528,7 +528,7 @@ class RocksDBFileManager( } logInfo(log"Copied ${MDC(LogKeys.NUM_FILES_COPIED, filesCopied)} files " + log"(${MDC(LogKeys.NUM_BYTES, bytesCopied)} bytes) from local to" + - log" DFS for version ${MDC(LogKeys.VERSION_NUMBER, version)}. " + + log" DFS for version ${MDC(LogKeys.VERSION_NUM, version)}. " + log"${MDC(LogKeys.NUM_FILES_REUSED, filesReused)} files reused without copying.") versionToRocksDBFiles.put(version, immutableFiles) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala index 9f568d075cd9..e7fc9f56dd9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala @@ -165,7 +165,7 @@ private[sql] class RocksDBStateStoreProvider verify(state == UPDATING, "Cannot commit after already committed or aborted") val newVersion = rocksDB.commit() state = COMMITTED - logInfo(log"Committed ${MDC(VERSION_NUMBER, newVersion)} " + + logInfo(log"Committed ${MDC(VERSION_NUM, newVersion)} " + log"for ${MDC(STATE_STORE_ID, id)}") newVersion } catch { @@ -176,7 +176,7 @@ private[sql] class RocksDBStateStoreProvider override def abort(): Unit = { verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed") - logInfo(log"Aborting ${MDC(VERSION_NUMBER, version + 1)} " + + logInfo(log"Aborting ${MDC(VERSION_NUM, version + 1)} " + log"for ${MDC(STATE_STORE_ID, id)}") rocksDB.rollback() state = ABORTED @@ -242,7 +242,7 @@ private[sql] class RocksDBStateStoreProvider stateStoreCustomMetrics) } else { logInfo(log"Failed to collect metrics for store_id=${MDC(STATE_STORE_ID, id)} " + - log"and version=${MDC(VERSION_NUMBER, version)}") + log"and version=${MDC(VERSION_NUM, version)}") StateStoreMetrics(0, 0, Map.empty) } } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java index 9345125a8279..46ee775e8dd4 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/server/HiveServer2.java @@ -145,7 +145,7 @@ public class HiveServer2 extends CompositeService { throw new Error("Max start attempts " + maxAttempts + " exhausted", throwable); } else { LOG.warn("Error starting HiveServer2 on attempt {}, will retry in 60 seconds", - throwable, MDC.of(LogKeys.RETRY_COUNT$.MODULE$, attempts)); + throwable, MDC.of(LogKeys.NUM_RETRY$.MODULE$, attempts)); try { Thread.sleep(60L * 1000L); } catch (InterruptedException e) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 1cd60c0d3fff..2bb2fe970a11 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -232,7 +232,7 @@ private[hive] class HiveClientImpl( caughtException = e logWarning( log"HiveClient got thrift exception, destroying client and retrying " + - log"${MDC(RETRY_COUNT, numTries)} times", e) + log"${MDC(NUM_RETRY, numTries)} times", e) clientLoader.cachedHive = null Thread.sleep(retryDelayMillis) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index a5d22f0bb5bd..bed048c4b5df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE, CHECKPOINT_TIME, PATH, RETRY_COUNT, TEMP_FILE} +import org.apache.spark.internal.LogKeys.{BACKUP_FILE, CHECKPOINT_FILE, CHECKPOINT_TIME, NUM_RETRY, PATH, TEMP_FILE} import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.streaming.scheduler.JobGenerator @@ -288,7 +288,7 @@ class CheckpointWriter( return } catch { case ioe: IOException => - val msg = log"Error in attempt ${MDC(RETRY_COUNT, attempts)} of writing checkpoint " + + val msg = log"Error in attempt ${MDC(NUM_RETRY, attempts)} of writing checkpoint " + log"to '${MDC(CHECKPOINT_FILE, checkpointFile)}'" logWarning(msg, ioe) fs = null diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 6162f4819016..58a6b929a81f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys.{RETRY_COUNT, WRITE_AHEAD_LOG_INFO} +import org.apache.spark.internal.LogKeys.{NUM_RETRY, WRITE_AHEAD_LOG_INFO} import org.apache.spark.util.{CompletionIterator, ThreadUtils} import org.apache.spark.util.ArrayImplicits._ @@ -107,7 +107,7 @@ private[streaming] class FileBasedWriteAheadLog( } } if (fileSegment == null) { - logError(log"Failed to write to write ahead log after ${MDC(RETRY_COUNT, failures)} failures") + logError(log"Failed to write to write ahead log after ${MDC(NUM_RETRY, failures)} failures") throw lastException } fileSegment --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org