This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c88fabfee41d [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework c88fabfee41d is described below commit c88fabfee41df1ca4729058450ec6f798641c936 Author: panbingkun <panbing...@baidu.com> AuthorDate: Tue Apr 23 11:00:44 2024 -0700 [SPARK-47604][CORE] Resource managers: Migrate logInfo with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to migrate `logInfo` in module `Resource managers` with variables to `structured logging framework`. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46130 from panbingkun/SPARK-47604. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../scala/org/apache/spark/internal/LogKey.scala | 45 +++++++- .../execution/ExecuteResponseObserver.scala | 2 +- .../deploy/k8s/SparkKubernetesClientFactory.scala | 9 +- .../k8s/submit/KubernetesClientApplication.scala | 8 +- .../deploy/k8s/submit/KubernetesClientUtils.scala | 4 +- .../k8s/submit/LoggingPodStatusWatcher.scala | 20 ++-- .../cluster/k8s/ExecutorPodsAllocator.scala | 25 ++-- .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 8 +- .../scheduler/cluster/k8s/ExecutorRollPlugin.scala | 4 +- .../cluster/k8s/KubernetesClusterManager.scala | 5 +- .../k8s/KubernetesClusterSchedulerBackend.scala | 11 +- ...ernetesLocalDiskShuffleExecutorComponents.scala | 21 ++-- .../spark/deploy/yarn/ApplicationMaster.scala | 40 ++++--- .../org/apache/spark/deploy/yarn/Client.scala | 59 ++++++---- .../apache/spark/deploy/yarn/ClientArguments.scala | 5 +- .../spark/deploy/yarn/ExecutorRunnable.scala | 29 ++--- .../spark/deploy/yarn/SparkRackResolver.scala | 7 +- .../apache/spark/deploy/yarn/YarnAllocator.scala | 126 ++++++++++++--------- .../yarn/YarnAllocatorNodeHealthTracker.scala | 12 +- .../cluster/YarnClientSchedulerBackend.scala | 4 +- .../scheduler/cluster/YarnSchedulerBackend.scala | 17 +-- 21 files changed, 283 insertions(+), 178 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 585373f1782b..b9b0e372a2b0 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -26,9 +26,12 @@ object LogKey extends Enumeration { val ACTUAL_PARTITION_COLUMN = Value val ALPHA = Value val ANALYSIS_ERROR = Value + val APP_ATTEMPT_ID = Value val APP_DESC = Value val APP_ID = Value + val APP_NAME = Value val APP_STATE = Value + val ARGS = Value val BACKUP_FILE = Value val BATCH_ID = Value val BLOCK_ID = Value @@ -45,6 +48,7 @@ object LogKey extends Enumeration { val CATEGORICAL_FEATURES = Value val CHECKPOINT_FILE = Value val CHECKPOINT_TIME = Value + val CHECKSUM_FILE_NUM = Value val CLASS_LOADER = Value val CLASS_NAME = Value val CLUSTER_CENTROIDS = Value @@ -70,6 +74,7 @@ object LogKey extends Enumeration { val CONSUMER = Value val CONTAINER = Value val CONTAINER_ID = Value + val CONTAINER_STATE = Value val COST = Value val COUNT = Value val CROSS_VALIDATION_METRIC = Value @@ -85,6 +90,7 @@ object LogKey extends Enumeration { val DATABASE_NAME = Value val DATAFRAME_CACHE_ENTRY = Value val DATAFRAME_ID = Value + val DATA_FILE_NUM = Value val DATA_SOURCE = Value val DATA_SOURCES = Value val DATA_SOURCE_PROVIDER = Value @@ -113,10 +119,16 @@ object LogKey extends Enumeration { val EXECUTE_INFO = Value val EXECUTE_KEY = Value val EXECUTION_PLAN_LEAVES = Value + val EXECUTOR_DESIRED_COUNT = Value + val EXECUTOR_ENVS = Value val EXECUTOR_ENV_REGEX = Value val EXECUTOR_ID = Value val EXECUTOR_IDS = Value + val EXECUTOR_LAUNCH_COMMANDS = Value + val EXECUTOR_LAUNCH_COUNT = Value + val EXECUTOR_RESOURCES = Value val EXECUTOR_STATE = Value + val EXECUTOR_TARGET_COUNT = Value val EXIT_CODE = Value val EXPECTED_NUM_FILES = Value val EXPECTED_PARTITION_COLUMN = Value @@ -129,8 +141,10 @@ object LogKey extends Enumeration { val FEATURE_COLUMN = Value val FEATURE_DIMENSION = Value val FIELD_NAME = Value + val FILE_ABSOLUTE_PATH = Value val FILE_FORMAT = Value val FILE_FORMAT2 = Value + val FILE_NAME = Value val FILE_VERSION = Value val FINISH_TRIGGER_DURATION = Value val FROM_OFFSET = Value @@ -140,6 +154,7 @@ object LogKey extends Enumeration { val GROUP_ID = Value val HADOOP_VERSION = Value val HASH_JOIN_KEYS = Value + val HEARTBEAT_INTERVAL = Value val HISTORY_DIR = Value val HIVE_CLIENT_VERSION = Value val HIVE_METASTORE_VERSION = Value @@ -149,18 +164,22 @@ object LogKey extends Enumeration { val HOST_PORT = Value val INCOMPATIBLE_TYPES = Value val INDEX = Value + val INDEX_FILE_NUM = Value val INDEX_NAME = Value val INFERENCE_MODE = Value val INITIAL_CAPACITY = Value + val INITIAL_HEARTBEAT_INTERVAL = Value val INIT_MODE = Value val INTERVAL = Value val ISOLATION_LEVEL = Value val JOB_ID = Value val JOIN_CONDITION = Value val JOIN_CONDITION_SUB_EXPR = Value + val K8S_CONTEXT = Value val KAFKA_PULLS_COUNT = Value val KAFKA_RECORDS_PULLED_COUNT = Value val KEY = Value + val KEYTAB = Value val LABEL_COLUMN = Value val LARGEST_CLUSTER_INDEX = Value val LAST_ACCESS_TIME = Value @@ -187,9 +206,11 @@ object LogKey extends Enumeration { val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_FILE_VERSION = Value + val MAX_MEMORY_SIZE = Value val MAX_PARTITIONS_SIZE = Value val MAX_SIZE = Value val MAX_TABLE_PARTITION_METADATA_SIZE = Value + val MEMORY_SIZE = Value val MERGE_DIR_NAME = Value val MESSAGE = Value val METHOD_NAME = Value @@ -202,6 +223,8 @@ object LogKey extends Enumeration { val NEW_LABEL_COLUMN_NAME = Value val NEW_PATH = Value val NEW_VALUE = Value + val NODES = Value + val NODE_LOCATION = Value val NORM = Value val NUM_BIN = Value val NUM_CLASSES = Value @@ -226,6 +249,7 @@ object LogKey extends Enumeration { val OPTIONS = Value val OP_ID = Value val OP_TYPE = Value + val OVERHEAD_MEMORY_SIZE = Value val PARSE_MODE = Value val PARTITIONED_FILE_READER = Value val PARTITIONS_SIZE = Value @@ -235,15 +259,21 @@ object LogKey extends Enumeration { val PATH = Value val PATHS = Value val PIPELINE_STAGE_UID = Value + val POD_COUNT = Value val POD_ID = Value val POD_NAME = Value val POD_NAMESPACE = Value val POD_PHASE = Value + val POD_SHARED_SLOT_COUNT = Value + val POD_STATE = Value + val POD_TARGET_COUNT = Value val POLICY = Value val PORT = Value + val PRINCIPAL = Value val PROCESSING_TIME = Value val PRODUCER_ID = Value val PROVIDER = Value + val PVC_METADATA_NAME = Value val QUERY_CACHE_VALUE = Value val QUERY_HINT = Value val QUERY_ID = Value @@ -267,11 +297,16 @@ object LogKey extends Enumeration { val RELATION_NAME = Value val RELATIVE_TOLERANCE = Value val REMAINING_PARTITIONS = Value + val REPORT_DETAILS = Value + val RESOURCE = Value val RESOURCE_NAME = Value + val RESOURCE_PROFILE_ID = Value + val RESOURCE_PROFILE_IDS = Value val RETRY_COUNT = Value val RETRY_INTERVAL = Value val RIGHT_EXPR = Value val RMSE = Value + val RPC_ENDPOINT_REF = Value val RULE_BATCH_NAME = Value val RULE_NAME = Value val RULE_NUMBER_OF_RUNS = Value @@ -286,6 +321,7 @@ object LogKey extends Enumeration { val SHUFFLE_BLOCK_INFO = Value val SHUFFLE_ID = Value val SHUFFLE_MERGE_ID = Value + val SHUFFLE_SERVICE_NAME = Value val SIZE = Value val SLEEP_TIME = Value val SLIDE_DURATION = Value @@ -293,6 +329,7 @@ object LogKey extends Enumeration { val SPARK_DATA_STREAM = Value val SPARK_PLAN_ID = Value val SQL_TEXT = Value + val SRC_PATH = Value val STAGE_ID = Value val START_INDEX = Value val STATEMENT_ID = Value @@ -309,6 +346,7 @@ object LogKey extends Enumeration { val SUB_QUERY = Value val TABLE_NAME = Value val TABLE_TYPES = Value + val TARGET_PATH = Value val TASK_ATTEMPT_ID = Value val TASK_ID = Value val TASK_NAME = Value @@ -326,14 +364,15 @@ object LogKey extends Enumeration { val TIMER_LABEL = Value val TIME_UNITS = Value val TIP = Value + val TOKEN_REGEX = Value val TOPIC = Value val TOPIC_PARTITION = Value val TOPIC_PARTITIONS = Value val TOPIC_PARTITION_OFFSET = Value val TOPIC_PARTITION_OFFSET_RANGE = Value + val TOTAL = Value val TOTAL_EFFECTIVE_TIME = Value val TOTAL_RECORDS_READ = Value - val TOTAL_SIZE = Value val TOTAL_TIME = Value val TOTAL_TIME_READ = Value val TO_TIME = Value @@ -343,6 +382,9 @@ object LogKey extends Enumeration { val TRAIN_WORD_COUNT = Value val TREE_NODE = Value val TRIGGER_INTERVAL = Value + val UI_FILTER = Value + val UI_FILTER_PARAMS = Value + val UI_PROXY_BASE = Value val UNSUPPORTED_EXPR = Value val UNSUPPORTED_HINT_REASON = Value val UNTIL_OFFSET = Value @@ -351,6 +393,7 @@ object LogKey extends Enumeration { val USER_ID = Value val USER_NAME = Value val VALUE = Value + val VIRTUAL_CORES = Value val VOCAB_SIZE = Value val WAIT_RESULT_TIME = Value val WAIT_SEND_TIME = Value diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 30a899a2ac13..1a6ceffd41c3 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -246,7 +246,7 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: // scalastyle:off line.size.limit logInfo( log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " + - log"total=${MDC(LogKey.TOTAL_SIZE, totalSize)} " + + log"total=${MDC(LogKey.TOTAL, totalSize)} " + log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " + log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " + log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 3763aeadea0e..abb8a43e3732 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -32,7 +32,8 @@ import okhttp3.OkHttpClient import org.apache.spark.SparkConf import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.K8S_CONTEXT import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.util.ThreadUtils @@ -84,9 +85,9 @@ object SparkKubernetesClientFactory extends Logging { // Allow for specifying a context used to auto-configure from the users K8S config file val kubeContext = sparkConf.get(KUBERNETES_CONTEXT).filter(_.nonEmpty) - logInfo("Auto-configuring K8S client using " + - kubeContext.map("context " + _).getOrElse("current context") + - " from users K8S config file") + logInfo(log"Auto-configuring K8S client using " + + log"${MDC(K8S_CONTEXT, kubeContext.map("context " + _).getOrElse("current context"))}" + + log" from users K8S config file") // if backoff limit is not set then set it to 3 if (getSystemPropertyOrEnvVar(KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY) == null) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 662f5ddbd4a7..c1cef2b84c71 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -31,7 +31,8 @@ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{APP_ID, APP_NAME, SUBMISSION_ID} import org.apache.spark.util.Utils /** @@ -203,8 +204,9 @@ private[spark] class Client( } } } else { - logInfo(s"Deployed Spark application ${conf.appName} with application ID ${conf.appId} " + - s"and submission ID $sId into Kubernetes") + logInfo(log"Deployed Spark application ${MDC(APP_NAME, conf.appName)} with " + + log"application ID ${MDC(APP_ID, conf.appId)} and " + + log"submission ID ${MDC(SUBMISSION_ID, sId)} into Kubernetes") } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala index beb7ff6bfe22..51f6a2079bf7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientUtils.scala @@ -141,8 +141,8 @@ private[spark] object KubernetesClientUtils extends Logging { } } if (truncatedMap.nonEmpty) { - logInfo(s"Spark configuration files loaded from $confDir :" + - s" ${truncatedMap.keys.mkString(",")}") + logInfo(log"Spark configuration files loaded from ${MDC(PATH, confDir)} : " + + log"${MDC(PATHS, truncatedMap.keys.mkString(","))}") } if (skippedFiles.nonEmpty) { logWarning(log"Skipped conf file(s) ${MDC(PATHS, skippedFiles.mkString(","))}, due to " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 3227a72a8371..2c6fb8079a17 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -23,7 +23,8 @@ import io.fabric8.kubernetes.client.Watcher.Action import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.KubernetesDriverConf import org.apache.spark.deploy.k8s.KubernetesUtils._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{APP_ID, APP_NAME, POD_PHASE, POD_STATE, STATUS, SUBMISSION_ID} private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { def watchOrStop(submissionId: String): Boolean @@ -83,7 +84,8 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) } private def logLongStatus(): Unit = { - logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown")) + logInfo(log"State changed, new state: " + + log"${MDC(POD_STATE, pod.map(formatPodState).getOrElse("unknown"))}") } private def hasCompleted(): Boolean = { @@ -96,22 +98,22 @@ private[k8s] class LoggingPodStatusWatcherImpl(conf: KubernetesDriverConf) } override def watchOrStop(sId: String): Boolean = { - logInfo(s"Waiting for application ${conf.appName} with application ID $appId " + - s"and submission ID $sId to finish...") + logInfo(log"Waiting for application ${MDC(APP_NAME, conf.appName)}} with application ID " + + log"${MDC(APP_ID, appId)} and submission ID ${MDC(SUBMISSION_ID, sId)} to finish...") val interval = conf.get(REPORT_INTERVAL) synchronized { while (!podCompleted && !resourceTooOldReceived) { wait(interval) - logInfo(s"Application status for $appId (phase: $phase)") + logInfo(log"Application status for ${MDC(APP_ID, appId)} (phase: ${MDC(POD_PHASE, phase)})") } } if(podCompleted) { logInfo( - pod.map { p => s"Container final statuses:\n\n${containersDescription(p)}" } - .getOrElse("No containers were found in the driver pod.")) - logInfo(s"Application ${conf.appName} with application ID $appId " + - s"and submission ID $sId finished") + pod.map { p => log"Container final statuses:\n\n${MDC(STATUS, containersDescription(p))}" } + .getOrElse(log"No containers were found in the driver pod.")) + logInfo(log"Application ${MDC(APP_NAME, conf.appName)} with application ID " + + log"${MDC(APP_ID, appId)} and submission ID ${MDC(SUBMISSION_ID, sId)} finished") } podCompleted } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index a48e1fba9954..eb70a509d1e7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -343,7 +343,8 @@ class ExecutorPodsAllocator( val toDelete = newlyCreatedToDelete ++ pendingToDelete if (toDelete.nonEmpty) { - logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") + logInfo(log"Deleting ${MDC(LogKey.COUNT, toDelete.size)} excess pod requests " + + log"(${MDC(LogKey.RESOURCE_PROFILE_IDS, toDelete.mkString(","))}).") _deletedExecutorIds = _deletedExecutorIds ++ toDelete Utils.tryLogNonFatalError { @@ -397,9 +398,11 @@ class ExecutorPodsAllocator( val numMissingPodsForRpId = targetNum - podCountForRpId val numExecutorsToAllocate = math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods) - logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " + - s"ResourceProfile Id: $rpId, target: $targetNum, known: $podCountForRpId, " + - s"sharedSlotFromPendingPods: $sharedSlotFromPendingPods.") + logInfo(log"Going to request ${MDC(LogKey.COUNT, numExecutorsToAllocate)} executors from " + + log"Kubernetes for ResourceProfile Id: ${MDC(LogKey.RESOURCE_PROFILE_ID, rpId)}, " + + log"target: ${MDC(LogKey.POD_TARGET_COUNT, targetNum)}, " + + log"known: ${MDC(LogKey.POD_COUNT, podCountForRpId)}, sharedSlotFromPendingPods: " + + log"${MDC(LogKey.POD_SHARED_SLOT_COUNT, sharedSlotFromPendingPods)}.") requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames) } } @@ -428,7 +431,8 @@ class ExecutorPodsAllocator( .filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName)) .filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli > podAllocationDelay) - logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs") + logInfo(log"Found ${MDC(LogKey.COUNT, reusablePVCs.size)} reusable PVCs from " + + log"${MDC(LogKey.TOTAL, createdPVCs.size)} PVCs") reusablePVCs } catch { case _: KubernetesClientException => @@ -449,7 +453,8 @@ class ExecutorPodsAllocator( val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse) for ( _ <- 0 until numExecutorsToAllocate) { if (reusablePVCs.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get()) { - logInfo(s"Wait to reuse one of the existing ${PVC_COUNTER.get()} PVCs.") + logInfo( + log"Wait to reuse one of the existing ${MDC(LogKey.COUNT, PVC_COUNTER.get())} PVCs.") return } val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() @@ -480,8 +485,9 @@ class ExecutorPodsAllocator( addOwnerReference(driverPod.get, Seq(resource)) } val pvc = resource.asInstanceOf[PersistentVolumeClaim] - logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " + - s"StorageClass ${pvc.getSpec.getStorageClassName}") + logInfo(log"Trying to create PersistentVolumeClaim " + + log"${MDC(LogKey.PVC_METADATA_NAME, pvc.getMetadata.getName)} with " + + log"StorageClass ${MDC(LogKey.CLASS_NAME, pvc.getSpec.getStorageClassName)}") kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create() PVC_COUNTER.incrementAndGet() } @@ -519,7 +525,8 @@ class ExecutorPodsAllocator( if (volume.nonEmpty) { val matchedPVC = reusablePVCs.remove(index) replacedResources.add(pvc) - logInfo(s"Reuse PersistentVolumeClaim ${matchedPVC.getMetadata.getName}") + logInfo(log"Reuse PersistentVolumeClaim " + + log"${MDC(LogKey.PVC_METADATA_NAME, matchedPVC.getMetadata.getName)}") volume.get.getPersistentVolumeClaim.setClaimName(matchedPVC.getMetadata.getName) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index 5590311bf661..212659d48be8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -30,7 +30,8 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesUtils._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EXECUTOR_ID import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.util.Utils @@ -99,8 +100,9 @@ private[spark] class ExecutorPodsLifecycleManager( if (onFinalNonDeletedState(succeeded, execId, schedulerBackend, deleteFromK8s)) { execIdsRemovedInThisRound += execId if (schedulerBackend.isExecutorActive(execId.toString)) { - logInfo(s"Snapshot reported succeeded executor with id $execId, " + - "even though the application has not requested for it to be removed.") + logInfo(log"Snapshot reported succeeded executor with id " + + log"${MDC(EXECUTOR_ID, execId)}, even though the application has not " + + log"requested for it to be removed.") } else { logDebug(s"Snapshot reported succeeded executor with id $execId," + s" pod name ${state.pod.getMetadata.getName}.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala index 1c0de8e2afde..2a09aebbeb20 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala @@ -27,7 +27,7 @@ import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING} import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG, INTERVAL} +import org.apache.spark.internal.LogKey.{CLASS_NAME, CONFIG, EXECUTOR_ID, INTERVAL} import org.apache.spark.internal.config.DECOMMISSION_ENABLED import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_SECOND @@ -82,7 +82,7 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging { choose(executorSummaryList, policy) match { case Some(id) => // Use decommission to be safe. - logInfo(s"Ask to decommission executor $id") + logInfo(log"Ask to decommission executor ${MDC(EXECUTOR_ID, id)}") val now = System.currentTimeMillis() scheduler.decommissionExecutor( id, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 3235d922204b..26355b749d54 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -25,7 +25,8 @@ import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.MASTER_URL import org.apache.spark.internal.config.TASK_MAX_FAILURES import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.local.LocalSchedulerBackend @@ -61,7 +62,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit if (threads == "*") localCpuCount else threads.toInt case _ => 1 } - logInfo(s"Running Spark with ${sc.conf.get(KUBERNETES_DRIVER_MASTER_URL)}") + logInfo(log"Running Spark with ${MDC(MASTER_URL, sc.conf.get(KUBERNETES_DRIVER_MASTER_URL))}") val schedulerImpl = scheduler.asInstanceOf[TaskSchedulerImpl] // KubernetesClusterSchedulerBackend respects `spark.app.id` while LocalSchedulerBackend // does not. Propagate `spark.app.id` via `spark.test.appId` to match the behavior. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index daf8d5e3f58a..8a8278b9d076 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -32,6 +32,8 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.LogKey.{COUNT, HOST_PORT, TOTAL} +import org.apache.spark.internal.MDC import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcAddress, RpcCallContext} @@ -255,9 +257,10 @@ private[spark] class KubernetesClusterSchedulerBackend( .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*) - if (!running.list().getItems().isEmpty()) { - logInfo(s"Forcefully deleting ${running.list().getItems().size()} pods " + - s"(out of ${executorIds.size}) that are still running after graceful shutdown period.") + if (!running.list().getItems.isEmpty) { + logInfo(log"Forcefully deleting ${MDC(COUNT, running.list().getItems.size())} pods " + + log"(out of ${MDC(TOTAL, executorIds.size)}) that are still running after graceful " + + log"shutdown period.") running.delete() } } @@ -353,7 +356,7 @@ private[spark] class KubernetesClusterSchedulerBackend( execIDRequester -= rpcAddress // Expected, executors re-establish a connection with an ID case _ => - logInfo(s"No executor found for ${rpcAddress}") + logInfo(log"No executor found for ${MDC(HOST_PORT, rpcAddress)}") } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala index 376218df5770..8acd2b5f2aba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala @@ -27,7 +27,7 @@ import org.apache.commons.io.FileExistsException import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.k8s.Config.KUBERNETES_DRIVER_REUSE_PVC -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.internal.config.{SHUFFLE_CHECKSUM_ALGORITHM, SHUFFLE_CHECKSUM_ENABLED} import org.apache.spark.shuffle.ShuffleChecksumUtils.{compareChecksums, getChecksumFileName} import org.apache.spark.shuffle.api.{ShuffleExecutorComponents, ShuffleMapOutputWriter, SingleSpillShuffleMapOutputWriter} @@ -54,7 +54,8 @@ class KubernetesLocalDiskShuffleExecutorComponents(sparkConf: SparkConf) KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, blockManager) } } else { - logInfo(s"Skip recovery because ${KUBERNETES_DRIVER_REUSE_PVC.key} is disabled.") + logInfo(log"Skip recovery because ${MDC(LogKey.CONFIG, KUBERNETES_DRIVER_REUSE_PVC.key)} " + + log"is disabled.") } } @@ -94,20 +95,23 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging { .partition(_.getName.contains(".checksum")) val (indexFiles, dataFiles) = files.partition(_.getName.endsWith(".index")) - logInfo(s"Found ${dataFiles.size} data files, ${indexFiles.size} index files, " + - s"and ${checksumFiles.size} checksum files.") + logInfo(log"Found ${MDC(LogKey.DATA_FILE_NUM, dataFiles.length)} data files, " + + log"${MDC(LogKey.INDEX_FILE_NUM, indexFiles.length)} index files, " + + log"and ${MDC(LogKey.CHECKSUM_FILE_NUM, checksumFiles.length)} checksum files.") // Build a hashmap with checksum file name as a key val checksumFileMap = new mutable.HashMap[String, File]() val algorithm = conf.get(SHUFFLE_CHECKSUM_ALGORITHM) checksumFiles.foreach { f => - logInfo(s"${f.getName} -> ${f.getAbsolutePath}") + logInfo(log"${MDC(LogKey.FILE_NAME, f.getName)} -> " + + log"${MDC(LogKey.FILE_ABSOLUTE_PATH, f.getAbsolutePath)}") checksumFileMap.put(f.getName, f) } // Build a hashmap with shuffle data file name as a key val indexFileMap = new mutable.HashMap[String, File]() indexFiles.foreach { f => - logInfo(s"${f.getName.replace(".index", ".data")} -> ${f.getAbsolutePath}") + logInfo(log"${MDC(LogKey.FILE_NAME, f.getName.replace(".index", ".data"))} -> " + + log"${MDC(LogKey.FILE_ABSOLUTE_PATH, f.getAbsolutePath)}") indexFileMap.put(f.getName.replace(".index", ".data"), f) } @@ -116,7 +120,7 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging { val level = StorageLevel.DISK_ONLY val checksumDisabled = !conf.get(SHUFFLE_CHECKSUM_ENABLED) (dataFiles ++ indexFiles).foreach { f => - logInfo(s"Try to recover ${f.getAbsolutePath}") + logInfo(log"Try to recover ${MDC(LogKey.FILE_ABSOLUTE_PATH, f.getAbsolutePath)}") try { val id = BlockId(f.getName) // To make it sure to handle only shuffle blocks @@ -129,7 +133,8 @@ object KubernetesLocalDiskShuffleExecutorComponents extends Logging { val decryptedSize = f.length() bm.TempFileBasedBlockStoreUpdater(id, level, classTag, f, decryptedSize).save() } else { - logInfo(s"Ignore ${f.getAbsolutePath} due to the verification failure.") + logInfo(log"Ignore ${MDC(LogKey.FILE_ABSOLUTE_PATH, f.getAbsolutePath)} " + + log"due to the verification failure.") } } else { logInfo("Ignore a non-shuffle block file.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index eb944244fc9d..19e918c4ff2a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -43,8 +43,7 @@ import org.apache.spark.deploy.{ExecutorFailureTracker, SparkHadoopUtil} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{EXIT_CODE, FAILURES, HOST_PORT} +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} @@ -220,7 +219,7 @@ private[spark] class ApplicationMaster( "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() - logInfo("ApplicationAttemptId: " + appAttemptId) + logInfo(log"ApplicationAttemptId: ${MDC(LogKey.APP_ATTEMPT_ID, appAttemptId)}") // During shutdown, we may not be able to create an FileSystem object. So, pre-create here. val stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) @@ -368,8 +367,9 @@ private[spark] class ApplicationMaster( final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { if (registered && !unregistered) { - logInfo(s"Unregistering ApplicationMaster with $status" + - Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) + logInfo(log"Unregistering ApplicationMaster with ${MDC(LogKey.APP_STATE, status)}" + + Option(diagnostics).map( + msg => log" (diag message: ${MDC(LogKey.MESSAGE, msg)})").getOrElse(log"")) unregistered = true client.unregister(status, Option(diagnostics).getOrElse("")) } @@ -387,8 +387,9 @@ private[spark] class ApplicationMaster( finalStatus = FinalApplicationStatus.FAILED exitCode = ApplicationMaster.EXIT_SC_NOT_INITED } - logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + - Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) + logInfo(log"Final app status: ${MDC(LogKey.APP_STATE, finalStatus)}, " + + log"exitCode: ${MDC(LogKey.EXIT_CODE, exitCode)}" + + Option(msg).map(msg => log", (reason: ${MDC(LogKey.REASON, msg)})").getOrElse(log"")) finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { @@ -481,8 +482,8 @@ private[spark] class ApplicationMaster( // the allocator is ready to service requests. rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) if (_sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { - logInfo("Initializing service data for shuffle service using name '" + - s"${_sparkConf.get(SHUFFLE_SERVICE_NAME)}'") + logInfo(log"Initializing service data for shuffle service using name '" + + log"${MDC(LogKey.SHUFFLE_SERVICE_NAME, _sparkConf.get(SHUFFLE_SERVICE_NAME))}'") } allocator.allocateResources() val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER, sparkConf) @@ -598,7 +599,7 @@ private[spark] class ApplicationMaster( s"$failureCount time(s) from Reporter thread.") } else { logWarning( - log"Reporter thread fails ${MDC(FAILURES, failureCount)} time(s) in a row.", e) + log"Reporter thread fails ${MDC(LogKey.FAILURES, failureCount)} time(s) in a row.", e) } } try { @@ -656,8 +657,9 @@ private[spark] class ApplicationMaster( t.setDaemon(true) t.setName("Reporter") t.start() - logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " + - s"initial allocation : $initialAllocationInterval) intervals") + logInfo(log"Started progress reporter thread with " + + log"(heartbeat: ${MDC(LogKey.HEARTBEAT_INTERVAL, heartbeatInterval)}, initial allocation: " + + log"${MDC(LogKey.INITIAL_HEARTBEAT_INTERVAL, initialAllocationInterval)}) intervals") t } @@ -683,7 +685,7 @@ private[spark] class ApplicationMaster( try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) if (!preserveFiles) { - logInfo("Deleting staging directory " + stagingDirPath) + logInfo(log"Deleting staging directory ${MDC(LogKey.PATH, stagingDirPath)}") fs.delete(stagingDirPath, true) } } catch { @@ -748,7 +750,7 @@ private[spark] class ApplicationMaster( // Reporter thread can interrupt to stop user class case SparkUserAppException(exitCode) => val msg = log"User application exited with status " + - log"${MDC(EXIT_CODE, exitCode)}" + log"${MDC(LogKey.EXIT_CODE, exitCode)}" logError(msg) finish(FinalApplicationStatus.FAILED, exitCode, msg.message) case cause: Throwable => @@ -831,7 +833,8 @@ private[spark] class ApplicationMaster( } case KillExecutors(executorIds) => - logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") + logInfo(log"Driver requested to kill executor(s) " + + log"${MDC(LogKey.EXECUTOR_IDS, executorIds.mkString(", "))}.") Option(allocator) match { case Some(a) => executorIds.foreach(a.killExecutor) case None => logWarning("Container allocator is not ready to kill executors yet.") @@ -854,11 +857,12 @@ private[spark] class ApplicationMaster( if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) { if (shutdown || !clientModeTreatDisconnectAsFailed) { if (exitCode == 0) { - logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") + logInfo(log"Driver terminated or disconnected! Shutting down. " + + log"${MDC(LogKey.HOST_PORT, remoteAddress)}") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } else { - logError(log"Driver terminated with exit code ${MDC(EXIT_CODE, exitCode)}! " + - log"Shutting down. ${MDC(HOST_PORT, remoteAddress)}") + logError(log"Driver terminated with exit code ${MDC(LogKey.EXIT_CODE, exitCode)}! " + + log"Shutting down. ${MDC(LogKey.HOST_PORT, remoteAddress)}") finish(FinalApplicationStatus.FAILED, exitCode) } } else { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index bed7c859003a..214a79559755 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -56,7 +56,6 @@ import org.apache.spark.deploy.yarn.ResourceRequestHelper._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{Logging, LogKey, MDC} -import org.apache.spark.internal.LogKey.{APP_ID, CONFIG, CONFIG2, PATH} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} @@ -142,7 +141,8 @@ private[spark] class Client( val principal = sparkConf.get(PRINCIPAL).orNull require((principal == null) == (keytab == null), "Both principal and keytab must be defined, or neither.") - logInfo(s"Kerberos credentials: principal = $principal, keytab = $keytab") + logInfo(log"Kerberos credentials: principal = ${MDC(LogKey.PRINCIPAL, principal)}, " + + log"keytab = ${MDC(LogKey.KEYTAB, keytab)}") // Generate a file name that can be used for the keytab file, that does not conflict // with any user file. Some(new File(keytab).getName() + "-" + UUID.randomUUID().toString) @@ -229,7 +229,7 @@ private[spark] class Client( val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application - logInfo(s"Submitting application $appId to ResourceManager") + logInfo(log"Submitting application ${MDC(LogKey.APP_ID, appId)} to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) @@ -254,11 +254,11 @@ private[spark] class Client( try { val fs = stagingDirPath.getFileSystem(hadoopConf) if (fs.delete(stagingDirPath, true)) { - logInfo(s"Deleted staging directory $stagingDirPath") + logInfo(log"Deleted staging directory ${MDC(LogKey.PATH, stagingDirPath)}") } } catch { case ioe: IOException => - logWarning(log"Failed to cleanup staging dir ${MDC(PATH, stagingDirPath)}", ioe) + logWarning(log"Failed to cleanup staging dir ${MDC(LogKey.PATH, stagingDirPath)}", ioe) } } @@ -332,7 +332,7 @@ private[spark] class Client( appContext.setLogAggregationContext(logAggregationContext) } catch { case NonFatal(e) => - logWarning(log"Ignoring ${MDC(CONFIG, ROLLED_LOG_INCLUDE_PATTERN.key)}} " + + logWarning(log"Ignoring ${MDC(LogKey.CONFIG, ROLLED_LOG_INCLUDE_PATTERN.key)}} " + log"because the version of YARN does not support it", e) } } @@ -371,14 +371,16 @@ private[spark] class Client( // SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM // for fetching delegation tokens. See YARN-5910 for more details. sparkConf.get(config.AM_TOKEN_CONF_REGEX).foreach { regex => - logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with regex $regex") + logInfo(log"Processing token conf (spark.yarn.am.tokenConfRegex) with " + + log"regex ${MDC(LogKey.TOKEN_REGEX, regex)}") val dob = new DataOutputBuffer() val copy = new Configuration(false) copy.clear() hadoopConf.asScala.foreach { entry => if (entry.getKey.matches(regex)) { copy.set(entry.getKey, entry.getValue) - logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}") + logInfo(log"Captured key: ${MDC(LogKey.KEY, entry.getKey)} -> " + + log"value: ${MDC(LogKey.VALUE, entry.getValue)}") } } copy.write(dob); @@ -403,8 +405,8 @@ private[spark] class Client( */ private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { val maxMem = newAppResponse.getMaximumResourceCapability.getMemorySize - logInfo("Verifying our application has not requested more than the maximum " + - s"memory capability of the cluster ($maxMem MB per container)") + logInfo(log"Verifying our application has not requested more than the maximum memory " + + log"capability of the cluster (${MDC(LogKey.MAX_MEMORY_SIZE, maxMem)} MB per container)") val executorMem = executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory if (executorMem > maxMem) { @@ -421,9 +423,8 @@ private[spark] class Client( "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " + "'yarn.nodemanager.resource.memory-mb'.") } - logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( - amMem, - amMemoryOverhead)) + logInfo(log"Will allocate AM container, with ${MDC(LogKey.MEMORY_SIZE, amMem)} MB memory " + + log"including ${MDC(LogKey.OVERHEAD_MEMORY_SIZE, amMemoryOverhead)} MB overhead") // We could add checks to make sure the entire cluster has enough resources but that involves // getting all the node reports and computing ourselves. @@ -447,7 +448,8 @@ private[spark] class Client( var destPath = srcPath if (force || !compareFs(srcFs, destFs) || "file".equals(srcFs.getScheme)) { destPath = new Path(destDir, destName.getOrElse(srcPath.getName())) - logInfo(s"Uploading resource $srcPath -> $destPath") + logInfo(log"Uploading resource ${MDC(LogKey.SRC_PATH, srcPath)} -> " + + log"${MDC(LogKey.TARGET_PATH, destPath)}") try { FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) } catch { @@ -458,7 +460,8 @@ private[spark] class Client( replication.foreach(repl => destFs.setReplication(destPath, repl)) destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) } else { - logInfo(s"Source and destination file systems are the same. Not copying $srcPath") + logInfo(log"Source and destination file systems are the same. " + + log"Not copying ${MDC(LogKey.SRC_PATH, srcPath)}") } // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific // version shows the specific version in the distributed cache configuration @@ -701,8 +704,9 @@ private[spark] class Client( case None => // No configuration, so fall back to uploading local jar files. logWarning( - log"Neither ${MDC(CONFIG, SPARK_JARS.key)} nor ${MDC(CONFIG2, SPARK_ARCHIVE.key)}} " + - log"is set, falling back to uploading libraries under SPARK_HOME.") + log"Neither ${MDC(LogKey.CONFIG, SPARK_JARS.key)} nor " + + log"${MDC(LogKey.CONFIG2, SPARK_ARCHIVE.key)}} is set, falling back to uploading " + + log"libraries under SPARK_HOME.") val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir( sparkConf.getenv("SPARK_HOME"))) val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", @@ -881,7 +885,7 @@ private[spark] class Client( if (dir.isDirectory()) { val files = dir.listFiles() if (files == null) { - logWarning(log"Failed to list files under directory ${MDC(PATH, dir)}") + logWarning(log"Failed to list files under directory ${MDC(LogKey.PATH, dir)}") } else { files.foreach { file => if (file.isFile && !hadoopConfFiles.contains(file.getName())) { @@ -1070,7 +1074,8 @@ private[spark] class Client( sparkConf)) } if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) { - logWarning(log"${MDC(CONFIG, AM_JAVA_OPTIONS.key)} will not take effect in cluster mode") + logWarning(log"${MDC(LogKey.CONFIG, AM_JAVA_OPTIONS.key)} will not take effect " + + log"in cluster mode") } } else { // Validate and include yarn am specific java options in yarn-client mode. @@ -1202,7 +1207,7 @@ private[spark] class Client( getApplicationReport() } catch { case e: ApplicationNotFoundException => - logError(log"Application ${MDC(APP_ID, appId)} not found.") + logError(log"Application ${MDC(LogKey.APP_ID, appId)} not found.") cleanupStagingDir() return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None) case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] => @@ -1216,7 +1221,8 @@ private[spark] class Client( reportsSinceLastLog += 1 if (logApplicationReport) { if (lastState != state || reportsSinceLastLog >= reportsTillNextLog) { - logInfo(s"Application report for $appId (state: $state)") + logInfo(log"Application report for ${MDC(LogKey.APP_ID, appId)} " + + log"(state: ${MDC(LogKey.APP_STATE, state)})") reportsSinceLastLog = 0 } @@ -1225,7 +1231,8 @@ private[spark] class Client( if (log.isDebugEnabled) { logDebug(formatReportDetails(report, getDriverLogsLink(report))) } else if (lastState != state) { - logInfo(formatReportDetails(report, getDriverLogsLink(report))) + logInfo(log"${MDC(LogKey.REPORT_DETAILS, + formatReportDetails(report, getDriverLogsLink(report)))}") } } @@ -1347,7 +1354,7 @@ private[spark] class Client( .getOrElse(IMap.empty) } catch { case e: Exception => - logWarning(log"Unable to get driver log links for ${MDC(APP_ID, appId)}: ", e) + logWarning(log"Unable to get driver log links for ${MDC(LogKey.APP_ID, appId)}: ", e) // Include the full stack trace only at DEBUG level to reduce verbosity logDebug(s"Unable to get driver log links for $appId", e) IMap.empty @@ -1367,8 +1374,10 @@ private[spark] class Client( if (!launcherBackend.isConnected() && fireAndForget) { val report = getApplicationReport() val state = report.getYarnApplicationState - logInfo(s"Application report for $appId (state: $state)") - logInfo(formatReportDetails(report, getDriverLogsLink(report))) + logInfo(log"Application report for ${MDC(LogKey.APP_ID, appId)} " + + log"(state: ${MDC(LogKey.APP_STATE, state)})") + logInfo(log"${MDC(LogKey.REPORT_DETAILS, + formatReportDetails(report, getDriverLogsLink(report)))}") if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { throw new SparkException(s"Application $appId finished with status: $state") } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 202ef36166d2..81588b2fcdda 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -19,7 +19,8 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.ArrayBuffer -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ARGS // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! private[spark] class ClientArguments(args: Array[String]) extends Logging { @@ -75,7 +76,7 @@ private[spark] class ClientArguments(args: Array[String]) extends Logging { } if (verbose) { - logInfo(s"Parsed user args for YARN application: [${userArgs.mkString(" ")}]") + logInfo(log"Parsed user args for YARN application: [${MDC(ARGS, userArgs.mkString(" "))}]") } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 81b210a2297a..035f02f16fbe 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,7 +38,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{EXECUTOR_ENVS, EXECUTOR_LAUNCH_COMMANDS, EXECUTOR_RESOURCES} import org.apache.spark.internal.config._ import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.Utils @@ -68,21 +69,23 @@ private[yarn] class ExecutorRunnable( startContainer() } - def launchContextDebugInfo(): String = { + def launchContextDebugInfo(): MessageWithContext = { val commands = prepareCommand() val env = prepareEnvironment() - s""" - |=============================================================================== - |Default YARN executor launch context: - | env: - |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString} - | command: - | ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n ")} - | - | resources: - |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString} - |===============================================================================""".stripMargin + // scalastyle:off line.size.limit + log""" + |=============================================================================== + |Default YARN executor launch context: + | env: + |${MDC(EXECUTOR_ENVS, Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString)} + | command: + | ${MDC(EXECUTOR_LAUNCH_COMMANDS, Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n "))} + | + | resources: + |${MDC(EXECUTOR_RESOURCES, localResources.map { case (k, v) => s" $k -> $v\n" }.mkString)} + |===============================================================================""".stripMargin + // scalastyle:on line.size.limit } def startContainer(): java.util.Map[String, ByteBuffer] = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala index e0d66af348e2..16f7581c7af4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala @@ -29,7 +29,8 @@ import org.apache.hadoop.yarn.util.RackResolver import org.apache.logging.log4j.{Level, LogManager} import org.apache.logging.log4j.core.Logger -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.NODE_LOCATION /** * Re-implement YARN's [[RackResolver]] for hadoop releases without YARN-9332. @@ -77,8 +78,8 @@ private[spark] class SparkRackResolver(conf: Configuration) extends Logging { val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala if (rNameList == null || rNameList.isEmpty) { hostNames.foreach(nodes += new NodeBase(_, NetworkTopology.DEFAULT_RACK)) - logInfo(s"Got an error when resolving hostNames. " + - s"Falling back to ${NetworkTopology.DEFAULT_RACK} for all") + logInfo(log"Got an error when resolving hostNames. " + + log"Falling back to ${MDC(NODE_LOCATION, NetworkTopology.DEFAULT_RACK)} for all") } else { for ((hostName, rName) <- hostNames.zip(rNameList)) { if (Strings.isNullOrEmpty(rName)) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index efe766be8356..b410ea2eb44d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -41,7 +41,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{APP_STATE, CONFIG, CONFIG2, CONFIG3, CONTAINER_ID, ERROR, EXECUTOR_ID, HOST, REASON} +import org.apache.spark.internal.LogKey import org.apache.spark.internal.config._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID @@ -195,7 +195,8 @@ private[yarn] class YarnAllocator( case (true, false) => true case (true, true) => logWarning(log"Yarn Executor Decommissioning is supported only " + - log"when ${MDC(CONFIG, SHUFFLE_SERVICE_ENABLED.key)} is set to false. See: SPARK-39018.") + log"when ${MDC(LogKey.CONFIG, SHUFFLE_SERVICE_ENABLED.key)} is set to false. " + + log"See: SPARK-39018.") false case (false, _) => false } @@ -313,7 +314,8 @@ private[yarn] class YarnAllocator( if (!rpIdToYarnResource.containsKey(rp.id)) { // track the resource profile if not already there getOrUpdateRunningExecutorForRPId(rp.id) - logInfo(s"Resource profile ${rp.id} doesn't exist, adding it") + logInfo(log"Resource profile ${MDC(LogKey.RESOURCE_PROFILE_ID, rp.id)} doesn't exist, " + + log"adding it") val resourcesWithDefaults = ResourceProfile.getResourcesForClusterManager(rp.id, rp.executorResources, @@ -399,8 +401,8 @@ private[yarn] class YarnAllocator( val res = resourceProfileToTotalExecs.map { case (rp, numExecs) => createYarnResourceForResourceProfile(rp) if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) { - logInfo(s"Driver requested a total number of $numExecs executor(s) " + - s"for resource profile id: ${rp.id}.") + logInfo(log"Driver requested a total number of ${MDC(LogKey.COUNT, numExecs)} " + + log"executor(s) for resource profile id: ${MDC(LogKey.RESOURCE_PROFILE_ID, rp.id)}.") targetNumExecutorsPerResourceProfileId(rp.id) = numExecs allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes) true @@ -421,7 +423,8 @@ private[yarn] class YarnAllocator( val (_, rpId) = containerIdToExecutorIdAndResourceProfileId(container.getId) internalReleaseContainer(container) getOrUpdateRunningExecutorForRPId(rpId).remove(executorId) - case _ => logWarning(log"Attempted to kill unknown executor ${MDC(EXECUTOR_ID, executorId)}!") + case _ => logWarning(log"Attempted to kill unknown executor " + + log"${MDC(LogKey.EXECUTOR_ID, executorId)}!") } } @@ -520,12 +523,13 @@ private[yarn] class YarnAllocator( if (missing > 0) { val resource = rpIdToYarnResource.get(rpId) if (log.isInfoEnabled()) { - var requestContainerMessage = s"Will request $missing executor container(s) for " + - s" ResourceProfile Id: $rpId, each with " + - s"${resource.getVirtualCores} core(s) and " + - s"${resource.getMemorySize} MB memory." - if (resource.getResources().nonEmpty) { - requestContainerMessage ++= s" with custom resources: $resource" + var requestContainerMessage = log"Will request ${MDC(LogKey.COUNT, missing)} executor " + + log"container(s) for ResourceProfile Id: ${MDC(LogKey.RESOURCE_PROFILE_ID, rpId)}, " + + log"each with ${MDC(LogKey.VIRTUAL_CORES, resource.getVirtualCores)} core(s) and " + + log"${MDC(LogKey.MEMORY_SIZE, resource.getMemorySize)} MB memory." + if (resource.getResources.nonEmpty) { + requestContainerMessage = requestContainerMessage + + log" with custom resources: ${MDC(LogKey.RESOURCE, resource)}" } logInfo(requestContainerMessage) } @@ -536,7 +540,8 @@ private[yarn] class YarnAllocator( } val cancelledContainers = staleRequests.size if (cancelledContainers > 0) { - logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)") + logInfo(log"Canceled ${MDC(LogKey.COUNT, cancelledContainers)} container request(s) " + + log"(locality no longer needed)") } // consider the number of new containers and cancelled stale containers available @@ -570,8 +575,8 @@ private[yarn] class YarnAllocator( amClient.removeContainerRequest(nonLocal) } if (numToCancel > 0) { - logInfo(s"Canceled $numToCancel unlocalized container requests to " + - s"resubmit with locality") + logInfo(log"Canceled ${MDC(LogKey.COUNT, numToCancel)} unlocalized container " + + log"requests to resubmit with locality") } } @@ -582,16 +587,20 @@ private[yarn] class YarnAllocator( if (log.isInfoEnabled()) { val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null) if (anyHost.nonEmpty) { - logInfo(s"Submitted ${anyHost.size} unlocalized container requests.") + logInfo(log"Submitted ${MDC(LogKey.COUNT, anyHost.size)}} unlocalized container " + + log"requests.") } localized.foreach { request => - logInfo(s"Submitted container request for host ${hostStr(request)}.") + logInfo(log"Submitted container request for host " + + log"${MDC(LogKey.HOST, hostStr(request))}.") } } } else if (numPendingAllocate > 0 && missing < 0) { val numToCancel = math.min(numPendingAllocate, -missing) - logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new " + - s"desired total ${getOrUpdateTargetNumExecutorsForRPId(rpId)} executors.") + logInfo(log"Canceling requests for ${MDC(LogKey.COUNT, numToCancel)} executor " + + log"container(s) to have a new desired total " + + log"${MDC(LogKey.EXECUTOR_DESIRED_COUNT, + getOrUpdateTargetNumExecutorsForRPId(rpId))} executors.") // cancel pending allocate requests by taking locality preference into account val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel) cancelRequests.foreach(amClient.removeContainerRequest) @@ -697,8 +706,9 @@ private[yarn] class YarnAllocator( runAllocatedContainers(containersToUse) - logInfo("Received %d containers from YARN, launching executors on %d of them." - .format(allocatedContainers.size, containersToUse.size)) + logInfo(log"Received ${MDC(LogKey.COUNT, allocatedContainers.size)} containers from YARN, " + + log"launching executors on ${MDC(LogKey.EXECUTOR_LAUNCH_COUNT, containersToUse.size)} " + + log"of them.") } /** @@ -751,8 +761,10 @@ private[yarn] class YarnAllocator( val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize) - logInfo(s"Launching container $containerId on host $executorHostname " + - s"for executor with ID $executorId for ResourceProfile Id $rpId") + logInfo(log"Launching container ${MDC(LogKey.CONTAINER_ID, containerId)} " + + log"on host ${MDC(LogKey.HOST, executorHostname)} for " + + log"executor with ID ${MDC(LogKey.EXECUTOR_ID, executorId)} for " + + log"ResourceProfile Id ${MDC(LogKey.RESOURCE_PROFILE_ID, rpId)}") val rp = rpIdToResourceProfile(rpId) val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf) @@ -790,8 +802,8 @@ private[yarn] class YarnAllocator( getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() launchingExecutorContainerIds.remove(containerId) if (NonFatal(e)) { - logError(log"Failed to launch executor ${MDC(EXECUTOR_ID, executorId)} " + - log"on container ${MDC(CONTAINER_ID, containerId)}", e) + logError(log"Failed to launch executor ${MDC(LogKey.EXECUTOR_ID, executorId)} " + + log"on container ${MDC(LogKey.CONTAINER_ID, containerId)}", e) // Assigned container should be released immediately // to avoid unnecessary resource occupation. amClient.releaseAssignedContainer(containerId) @@ -805,9 +817,9 @@ private[yarn] class YarnAllocator( updateInternalState(rpId, executorId, container) } } else { - logInfo(("Skip launching executorRunnable as running executors count: %d " + - "reached target executors count: %d.").format(rpRunningExecs, - getOrUpdateTargetNumExecutorsForRPId(rpId))) + logInfo(log"Skip launching executorRunnable as running executors count: " + + log"${MDC(LogKey.COUNT, rpRunningExecs)} reached target executors count: " + + log"${MDC(LogKey.EXECUTOR_TARGET_COUNT, getOrUpdateTargetNumExecutorsForRPId(rpId))}.") } } } @@ -849,47 +861,47 @@ private[yarn] class YarnAllocator( case Some((executorId, _)) => getOrUpdateRunningExecutorForRPId(rpId).remove(executorId) case None => logWarning(log"Cannot find executorId for container: " + - log"${MDC(CONTAINER_ID, containerId)}") + log"${MDC(LogKey.CONTAINER_ID, containerId)}") } - logInfo("Completed container %s%s (state: %s, exit status: %s)".format( - containerId, - onHostStr, - completedContainer.getState, - completedContainer.getExitStatus)) + logInfo(log"Completed container ${MDC(LogKey.CONTAINER_ID, containerId)}" + + log"${MDC(LogKey.HOST, onHostStr)} " + + log"(state: ${MDC(LogKey.CONTAINER_STATE, completedContainer.getState)}, " + + log"exit status: ${MDC(LogKey.EXIT_CODE, completedContainer.getExitStatus)}") val exitStatus = completedContainer.getExitStatus val (exitCausedByApp, containerExitReason) = exitStatus match { case _ if shutdown => - (false, log"Executor for container ${MDC(CONTAINER_ID, containerId)} exited after " + - log"Application shutdown.") + (false, log"Executor for container ${MDC(LogKey.CONTAINER_ID, containerId)} " + + log"exited after Application shutdown.") case ContainerExitStatus.SUCCESS => - (false, log"Executor for container ${MDC(CONTAINER_ID, containerId)} exited because " + - log"of a YARN event (e.g., preemption) and not because of an error in the running " + - log"job.") + (false, log"Executor for container ${MDC(LogKey.CONTAINER_ID, containerId)} " + + log"exited because of a YARN event (e.g., preemption) and not because of an " + + log"error in the running job.") case ContainerExitStatus.PREEMPTED => // Preemption is not the fault of the running tasks, since YARN preempts containers // merely to do resource sharing, and tasks that fail due to preempted executors could // just as easily finish on any other executor. See SPARK-8167. - (false, log"Container ${MDC(CONTAINER_ID, containerId)}${MDC(HOST, onHostStr)} " + - log"was preempted.") + (false, log"Container ${MDC(LogKey.CONTAINER_ID, containerId)}" + + log"${MDC(LogKey.HOST, onHostStr)} was preempted.") // Should probably still count memory exceeded exit codes towards task failures case ContainerExitStatus.KILLED_EXCEEDED_VMEM => val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r val diag = vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics) .map(_.concat(".")).getOrElse("") val message = log"Container killed by YARN for exceeding virtual memory limits. " + - log"${MDC(ERROR, diag)} Consider boosting " + - log"${MDC(CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)} or boosting " + - log"${MDC(CONFIG2, YarnConfiguration.NM_VMEM_PMEM_RATIO)} or disabling " + - log"${MDC(CONFIG3, YarnConfiguration.NM_VMEM_CHECK_ENABLED)} because of YARN-4714." + log"${MDC(LogKey.ERROR, diag)} Consider boosting " + + log"${MDC(LogKey.CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)} or boosting " + + log"${MDC(LogKey.CONFIG2, YarnConfiguration.NM_VMEM_PMEM_RATIO)} or disabling " + + log"${MDC(LogKey.CONFIG3, YarnConfiguration.NM_VMEM_CHECK_ENABLED)} " + + log"because of YARN-4714." (true, message) case ContainerExitStatus.KILLED_EXCEEDED_PMEM => val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r val diag = pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics) .map(_.concat(".")).getOrElse("") val message = log"Container killed by YARN for exceeding physical memory limits. " + - log"${MDC(ERROR, diag)} Consider boosting " + - log"${MDC(CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)}." + log"${MDC(LogKey.ERROR, diag)} Consider boosting " + + log"${MDC(LogKey.CONFIG, EXECUTOR_MEMORY_OVERHEAD.key)}." (true, message) case other_exit_status => val exitStatus = completedContainer.getExitStatus @@ -900,17 +912,19 @@ private[yarn] class YarnAllocator( // SPARK-26269: follow YARN's behaviour, see details in // org.apache.hadoop.yarn.util.Apps#shouldCountTowardsNodeBlacklisting if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) { - (false, log"Container marked as failed: ${MDC(CONTAINER_ID, containerId)}" + - log"${MDC(HOST, onHostStr)}. Exit status: ${MDC(APP_STATE, exitStatus)}. " + - log"Possible causes: ${MDC(REASON, sparkExitCodeReason)} " + - log"Diagnostics: ${MDC(ERROR, completedContainer.getDiagnostics)}.") + (false, log"Container marked as failed: ${MDC(LogKey.CONTAINER_ID, containerId)}" + + log"${MDC(LogKey.HOST, onHostStr)}. " + + log"Exit status: ${MDC(LogKey.EXIT_CODE, exitStatus)}. " + + log"Possible causes: ${MDC(LogKey.REASON, sparkExitCodeReason)} " + + log"Diagnostics: ${MDC(LogKey.ERROR, completedContainer.getDiagnostics)}.") } else { // completed container from a bad node allocatorNodeHealthTracker.handleResourceAllocationFailure(hostOpt) - (true, log"Container from a bad node: ${MDC(CONTAINER_ID, containerId)}" + - log"${MDC(HOST, onHostStr)}. Exit status: ${MDC(APP_STATE, exitStatus)}. " + - log"Possible causes: ${MDC(REASON, sparkExitCodeReason)} " + - log"Diagnostics: ${MDC(ERROR, completedContainer.getDiagnostics)}.") + (true, log"Container from a bad node: ${MDC(LogKey.CONTAINER_ID, containerId)}" + + log"${MDC(LogKey.HOST, onHostStr)}. " + + log"Exit status: ${MDC(LogKey.EXIT_CODE, exitStatus)}. " + + log"Possible causes: ${MDC(LogKey.REASON, sparkExitCodeReason)} " + + log"Diagnostics: ${MDC(LogKey.ERROR, completedContainer.getDiagnostics)}.") } } if (exitCausedByApp) { @@ -981,7 +995,7 @@ private[yarn] class YarnAllocator( context.reply(releasedExecutorLossReasons.remove(eid).get) } else { logWarning(log"Tried to get the loss reason for non-existent executor " + - log"${MDC(EXECUTOR_ID, eid)}") + log"${MDC(LogKey.EXECUTOR_ID, eid)}") context.sendFailure( new SparkException(s"Fail to find loss reason for non-existent executor $eid")) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala index 22937ed8117a..3ba6687b0224 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorNodeHealthTracker.scala @@ -25,7 +25,8 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.spark.SparkConf import org.apache.spark.deploy.ExecutorFailureTracker import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{FAILURES, HOST, NODES} import org.apache.spark.internal.config._ import org.apache.spark.scheduler.HealthTracker @@ -90,7 +91,8 @@ private[spark] class YarnAllocatorNodeHealthTracker( private def updateAllocationExcludedNodes(hostname: String): Unit = { val failuresOnHost = failureTracker.numFailuresOnHost(hostname) if (failuresOnHost > maxFailuresPerHost) { - logInfo(s"excluding $hostname as YARN allocation failed $failuresOnHost times") + logInfo(log"excluding ${MDC(HOST, hostname)} as YARN allocation failed " + + log"${MDC(FAILURES, failuresOnHost)} times") allocatorExcludedNodeList.put( hostname, failureTracker.clock.getTimeMillis() + excludeOnFailureTimeoutMillis) @@ -125,10 +127,12 @@ private[spark] class YarnAllocatorNodeHealthTracker( val additions = (nodesToExclude -- currentExcludededYarnNodes).toList.sorted val removals = (currentExcludededYarnNodes -- nodesToExclude).toList.sorted if (additions.nonEmpty) { - logInfo(s"adding nodes to YARN application master's excluded node list: $additions") + logInfo(log"adding nodes to YARN application master's " + + log"excluded node list: ${MDC(NODES, additions)}") } if (removals.nonEmpty) { - logInfo(s"removing nodes from YARN application master's excluded node list: $removals") + logInfo(log"removing nodes from YARN application master's " + + log"excluded node list: ${MDC(NODES, removals)}") } if (additions.nonEmpty || removals.nonEmpty) { // Note YARNs api for excluding nodes is updateBlacklist. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index ccc0bc9f715e..c7defac98b90 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -27,7 +27,7 @@ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.APP_STATE +import org.apache.spark.internal.LogKey.{APP_ID, APP_STATE} import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -99,7 +99,7 @@ private[spark] class YarnClientSchedulerBackend( throw new SparkException(exceptionMsg) } if (state == YarnApplicationState.RUNNING) { - logInfo(s"Application ${appId.get} has started running.") + logInfo(log"Application ${MDC(APP_ID, appId.get)} has started running.") } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index d7f285aeb892..c6976caf3a77 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{EXECUTOR_ID, HOST_PORT, REASON} +import org.apache.spark.internal.LogKey import org.apache.spark.internal.config.UI._ import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc._ @@ -221,7 +221,9 @@ private[spark] abstract class YarnSchedulerBackend( if (hasFilter) { // SPARK-26255: Append user provided filters(spark.ui.filters) with yarn filter. val allFilters = Seq(filterName) ++ conf.get(UI_FILTERS) - logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase") + logInfo(log"Add WebUI Filter. ${MDC(LogKey.UI_FILTER, filterName)}, " + + log"${MDC(LogKey.UI_FILTER_PARAMS, filterParams)}, " + + log"${MDC(LogKey.UI_PROXY_BASE, proxyBase)}") // For already installed handlers, prepend the filter. scheduler.sc.ui.foreach { ui => @@ -306,8 +308,8 @@ private[spark] abstract class YarnSchedulerBackend( .recover { case NonFatal(e) => logWarning(log"Attempted to get executor loss reason for executor id " + - log"${MDC(EXECUTOR_ID, executorId)} at RPC address " + - log"${MDC(HOST_PORT, executorRpcAddress)}, but got no response. " + + log"${MDC(LogKey.EXECUTOR_ID, executorId)} at RPC address " + + log"${MDC(LogKey.HOST_PORT, executorRpcAddress)}, but got no response. " + log"Marking as agent lost.", e) RemoveExecutor(executorId, ExecutorProcessLost()) }(ThreadUtils.sameThread) @@ -332,7 +334,7 @@ private[spark] abstract class YarnSchedulerBackend( override def receive: PartialFunction[Any, Unit] = { case RegisterClusterManager(am) => - logInfo(s"ApplicationMaster registered as $am") + logInfo(log"ApplicationMaster registered as ${MDC(LogKey.RPC_ENDPOINT_REF, am)}") amEndpoint = Option(am) reset() @@ -346,7 +348,7 @@ private[spark] abstract class YarnSchedulerBackend( case r @ RemoveExecutor(executorId, reason) => if (!stopped.get) { logWarning(log"Requesting driver to remove executor " + - log"${MDC(EXECUTOR_ID, executorId)} for reason ${MDC(REASON, reason)}") + log"${MDC(LogKey.EXECUTOR_ID, executorId)} for reason ${MDC(LogKey.REASON, reason)}") driverEndpoint.send(r) } @@ -395,7 +397,8 @@ private[spark] abstract class YarnSchedulerBackend( override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (amEndpoint.exists(_.address == remoteAddress)) { - logWarning(log"ApplicationMaster has disassociated: ${MDC(HOST_PORT, remoteAddress)}") + logWarning(log"ApplicationMaster has disassociated: " + + log"${MDC(LogKey.HOST_PORT, remoteAddress)}") amEndpoint = None } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org