This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9d1f3dc05a [SPARK-48134][CORE] Spark core (java side): Migrate 
`error/warn/info` with variables to structured logging framework
3d9d1f3dc05a is described below

commit 3d9d1f3dc05a2825bf315c68fc4e4232354dbd00
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Tue May 7 13:08:00 2024 -0700

    [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with 
variables to structured logging framework
    
    ### What changes were proposed in this pull request?
    The pr aims to
    1.migrate `error/warn/info` in module `core` with variables to `structured 
logging framework` for java side.
    2.convert all dependencies on `org.slf4j.Logger & org.slf4j.LoggerFactory` 
to `org.apache.spark.internal.Logger & 
org.apache.spark.internal.LoggerFactory`, in order to completely `prohibit` 
importing `org.slf4j.Logger & org.slf4j.LoggerFactory` in java code later.
    
    ### Why are the changes needed?
    To enhance Apache Spark's logging system by implementing structured logging.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46390 from panbingkun/core_java_sl.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../java/org/apache/spark/internal/Logger.java     | 21 +++++++++++++++-
 .../scala/org/apache/spark/internal/LogKey.scala   |  9 +++++++
 .../org/apache/spark/io/ReadAheadInputStream.java  | 19 ++++++++-------
 .../org/apache/spark/memory/TaskMemoryManager.java | 28 +++++++++++++++-------
 .../shuffle/sort/BypassMergeSortShuffleWriter.java |  9 ++++---
 .../spark/shuffle/sort/ShuffleExternalSorter.java  | 25 ++++++++++---------
 .../spark/shuffle/sort/UnsafeShuffleWriter.java    |  9 ++++---
 .../sort/io/LocalDiskShuffleMapOutputWriter.java   | 10 ++++----
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 12 ++++++----
 .../unsafe/sort/UnsafeExternalSorter.java          | 21 +++++++++-------
 .../unsafe/sort/UnsafeSorterSpillReader.java       |  4 ++--
 11 files changed, 113 insertions(+), 54 deletions(-)

diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java 
b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
index 2b4dd3bb45bc..d8ab26424bae 100644
--- a/common/utils/src/main/java/org/apache/spark/internal/Logger.java
+++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
@@ -34,6 +34,10 @@ public class Logger {
     this.slf4jLogger = slf4jLogger;
   }
 
+  public boolean isErrorEnabled() {
+    return slf4jLogger.isErrorEnabled();
+  }
+
   public void error(String msg) {
     slf4jLogger.error(msg);
   }
@@ -58,6 +62,10 @@ public class Logger {
     }
   }
 
+  public boolean isWarnEnabled() {
+    return slf4jLogger.isWarnEnabled();
+  }
+
   public void warn(String msg) {
     slf4jLogger.warn(msg);
   }
@@ -82,6 +90,10 @@ public class Logger {
     }
   }
 
+  public boolean isInfoEnabled() {
+    return slf4jLogger.isInfoEnabled();
+  }
+
   public void info(String msg) {
     slf4jLogger.info(msg);
   }
@@ -106,6 +118,10 @@ public class Logger {
     }
   }
 
+  public boolean isDebugEnabled() {
+    return slf4jLogger.isDebugEnabled();
+  }
+
   public void debug(String msg) {
     slf4jLogger.debug(msg);
   }
@@ -126,6 +142,10 @@ public class Logger {
     slf4jLogger.debug(msg, throwable);
   }
 
+  public boolean isTraceEnabled() {
+    return slf4jLogger.isTraceEnabled();
+  }
+
   public void trace(String msg) {
     slf4jLogger.trace(msg);
   }
@@ -146,7 +166,6 @@ public class Logger {
     slf4jLogger.trace(msg, throwable);
   }
 
-
   private void withLogContext(
       String pattern,
       MDC[] mdcs,
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index d4e1d9f535af..c127f9c3d1f9 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -168,6 +168,7 @@ object LogKeys {
   case object EXCEPTION extends LogKey
   case object EXECUTE_INFO extends LogKey
   case object EXECUTE_KEY extends LogKey
+  case object EXECUTION_MEMORY_SIZE extends LogKey
   case object EXECUTION_PLAN_LEAVES extends LogKey
   case object EXECUTOR_BACKEND extends LogKey
   case object EXECUTOR_DESIRED_COUNT extends LogKey
@@ -302,6 +303,7 @@ object LogKeys {
   case object MAX_SLOTS extends LogKey
   case object MAX_SPLIT_BYTES extends LogKey
   case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey
+  case object MEMORY_CONSUMER extends LogKey
   case object MEMORY_POOL_NAME extends LogKey
   case object MEMORY_SIZE extends LogKey
   case object MERGE_DIR_NAME extends LogKey
@@ -342,6 +344,7 @@ object LogKeys {
   case object NUM_CONCURRENT_WRITER extends LogKey
   case object NUM_CORES extends LogKey
   case object NUM_DROPPED_PARTITIONS extends LogKey
+  case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey
   case object NUM_EVENTS extends LogKey
   case object NUM_EXAMPLES extends LogKey
   case object NUM_EXECUTOR_CORES extends LogKey
@@ -375,6 +378,8 @@ object LogKeys {
   case object NUM_RIGHT_PARTITION_VALUES extends LogKey
   case object NUM_SEQUENCES extends LogKey
   case object NUM_SLOTS extends LogKey
+  case object NUM_SPILL_INFOS extends LogKey
+  case object NUM_SPILL_WRITERS extends LogKey
   case object NUM_TASKS extends LogKey
   case object NUM_TASK_CPUS extends LogKey
   case object NUM_VERSIONS_RETAIN extends LogKey
@@ -394,6 +399,7 @@ object LogKeys {
   case object OP_TYPE extends LogKey
   case object OUTPUT extends LogKey
   case object OVERHEAD_MEMORY_SIZE extends LogKey
+  case object PAGE_SIZE extends LogKey
   case object PARSE_MODE extends LogKey
   case object PARTITIONED_FILE_READER extends LogKey
   case object PARTITIONER extends LogKey
@@ -502,6 +508,7 @@ object LogKeys {
   case object SOCKET_ADDRESS extends LogKey
   case object SPARK_DATA_STREAM extends LogKey
   case object SPARK_PLAN_ID extends LogKey
+  case object SPILL_TIMES extends LogKey
   case object SQL_TEXT extends LogKey
   case object SRC_PATH extends LogKey
   case object STAGE_ATTEMPT extends LogKey
@@ -516,6 +523,7 @@ object LogKeys {
   case object STORAGE_LEVEL extends LogKey
   case object STORAGE_LEVEL_DESERIALIZED extends LogKey
   case object STORAGE_LEVEL_REPLICATION extends LogKey
+  case object STORAGE_MEMORY_SIZE extends LogKey
   case object STORE_ID extends LogKey
   case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey
   case object STREAMING_DATA_SOURCE_NAME extends LogKey
@@ -543,6 +551,7 @@ object LogKeys {
   case object TEMP_PATH extends LogKey
   case object TEST_SIZE extends LogKey
   case object THREAD extends LogKey
+  case object THREAD_ID extends LogKey
   case object THREAD_NAME extends LogKey
   case object TID extends LogKey
   case object TIME extends LogKey
diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
index 33dfa4422906..6d0bbc89e2b5 100644
--- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
+++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
@@ -13,13 +13,6 @@
  */
 package org.apache.spark.io;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import org.apache.spark.util.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.GuardedBy;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -30,6 +23,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
+import org.apache.spark.util.ThreadUtils;
 
 /**
  * {@link InputStream} implementation which asynchronously reads ahead from 
the underlying input
@@ -205,7 +208,7 @@ public class ReadAheadInputStream extends InputStream {
       try {
         underlyingInputStream.close();
       } catch (IOException e) {
-        logger.warn(e.getMessage(), e);
+        logger.warn("{}", e, MDC.of(LogKeys.ERROR$.MODULE$, e.getMessage()));
       }
     }
   }
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 83352611770f..aeabd358144f 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -29,9 +29,11 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.Utils;
 
@@ -244,10 +246,12 @@ public class TaskMemoryManager {
       }
     } catch (ClosedByInterruptException e) {
       // This called by user to kill a task (e.g: speculative task).
-      logger.error("error while calling spill() on " + consumerToSpill, e);
+      logger.error("error while calling spill() on {}", e,
+        MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
       throw new RuntimeException(e.getMessage());
     } catch (IOException e) {
-      logger.error("error while calling spill() on " + consumerToSpill, e);
+      logger.error("error while calling spill() on {}", e,
+        MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill));
       // checkstyle.off: RegexpSinglelineJava
       throw new SparkOutOfMemoryError("error while calling spill() on " + 
consumerToSpill + " : "
         + e.getMessage());
@@ -270,24 +274,29 @@ public class TaskMemoryManager {
    * Dump the memory usage of all consumers.
    */
   public void showMemoryUsage() {
-    logger.info("Memory used in task " + taskAttemptId);
+    logger.info("Memory used in task {}",
+      MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
     synchronized (this) {
       long memoryAccountedForByConsumers = 0;
       for (MemoryConsumer c: consumers) {
         long totalMemUsage = c.getUsed();
         memoryAccountedForByConsumers += totalMemUsage;
         if (totalMemUsage > 0) {
-          logger.info("Acquired by " + c + ": " + 
Utils.bytesToString(totalMemUsage));
+          logger.info("Acquired by {}: {}",
+            MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, c),
+            MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, 
Utils.bytesToString(totalMemUsage)));
         }
       }
       long memoryNotAccountedFor =
         memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - 
memoryAccountedForByConsumers;
       logger.info(
         "{} bytes of memory were used by task {} but are not associated with 
specific consumers",
-        memoryNotAccountedFor, taskAttemptId);
+        MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, memoryNotAccountedFor),
+        MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId));
       logger.info(
         "{} bytes of memory are used for execution and {} bytes of memory are 
used for storage",
-        memoryManager.executionMemoryUsed(), 
memoryManager.storageMemoryUsed());
+        MDC.of(LogKeys.EXECUTION_MEMORY_SIZE$.MODULE$, 
memoryManager.executionMemoryUsed()),
+        MDC.of(LogKeys.STORAGE_MEMORY_SIZE$.MODULE$,  
memoryManager.storageMemoryUsed()));
     }
   }
 
@@ -333,7 +342,8 @@ public class TaskMemoryManager {
     try {
       page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
     } catch (OutOfMemoryError e) {
-      logger.warn("Failed to allocate a page ({} bytes), try again.", 
acquired);
+      logger.warn("Failed to allocate a page ({} bytes), try again.",
+        MDC.of(LogKeys.PAGE_SIZE$.MODULE$, acquired));
       // there is no enough memory actually, it means the actual free memory 
is smaller than
       // MemoryManager thought, we should keep the acquired memory.
       synchronized (this) {
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index d067c870acc9..284d1dd036b4 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -33,9 +33,11 @@ import scala.Tuple2;
 import scala.collection.Iterator;
 
 import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.Partitioner;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -223,7 +225,8 @@ final class BypassMergeSortShuffleWriter<K, V>
               writePartitionedDataWithStream(file, writer);
             }
             if (!file.delete()) {
-              logger.error("Unable to delete file for partition {}", i);
+              logger.error("Unable to delete file for partition {}",
+                MDC.of(LogKeys.PARTITION_ID$.MODULE$, i));
             }
           }
         }
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index b097089282ce..8fe432cfe239 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -23,17 +23,19 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.zip.Checksum;
 
-import org.apache.spark.SparkException;
 import scala.Tuple2;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.internal.config.package$;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
@@ -159,11 +161,11 @@ final class ShuffleExternalSorter extends MemoryConsumer 
implements ShuffleCheck
     if (!isFinalFile) {
       logger.info(
         "Task {} on Thread {} spilling sort data of {} to disk ({} {} so far)",
-        taskContext.taskAttemptId(),
-        Thread.currentThread().getId(),
-        Utils.bytesToString(getMemoryUsage()),
-        spills.size(),
-        spills.size() != 1 ? " times" : " time");
+        MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskContext.taskAttemptId()),
+        MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
+        MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, 
Utils.bytesToString(getMemoryUsage())),
+        MDC.of(LogKeys.NUM_SPILL_INFOS$.MODULE$, spills.size()),
+        MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spills.size() != 1 ? "times" : 
"time"));
     }
 
     // This call performs the actual sort.
@@ -349,7 +351,8 @@ final class ShuffleExternalSorter extends MemoryConsumer 
implements ShuffleCheck
     }
     for (SpillInfo spill : spills) {
       if (spill.file.exists() && !spill.file.delete()) {
-        logger.error("Unable to delete spill file {}", spill.file.getPath());
+        logger.error("Unable to delete spill file {}",
+          MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath()));
       }
     }
   }
@@ -416,8 +419,8 @@ final class ShuffleExternalSorter extends MemoryConsumer 
implements ShuffleCheck
     // for tests
     assert(inMemSorter != null);
     if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
-      logger.info("Spilling data because number of spilledRecords crossed the 
threshold " +
-        numElementsForSpillThreshold);
+      logger.info("Spilling data because number of spilledRecords crossed the 
threshold {}" +
+        MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, 
numElementsForSpillThreshold));
       spill();
     }
 
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index f5949d6ae7a5..6da9d3def3f8 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -35,12 +35,14 @@ import scala.reflect.ClassTag$;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.spark.*;
 import org.apache.spark.annotation.Private;
 import org.apache.spark.internal.config.package$;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.io.CompressionCodec;
 import org.apache.spark.io.CompressionCodec$;
 import org.apache.spark.io.NioBufferedFileInputStream;
@@ -226,7 +228,8 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
       sorter = null;
       for (SpillInfo spill : spills) {
         if (spill.file.exists() && !spill.file.delete()) {
-          logger.error("Error while deleting spill file {}", 
spill.file.getPath());
+          logger.error("Error while deleting spill file {}",
+            MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath()));
         }
       }
     }
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index efe508d1361c..fbf4abc160b6 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -26,10 +26,11 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.Optional;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.spark.SparkConf;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
 import org.apache.spark.shuffle.api.ShufflePartitionWriter;
 import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
@@ -123,7 +124,8 @@ public class LocalDiskShuffleMapOutputWriter implements 
ShuffleMapOutputWriter {
   public void abort(Throwable error) throws IOException {
     cleanUp();
     if (outputTempFile != null && outputTempFile.exists() && 
!outputTempFile.delete()) {
-      log.warn("Failed to delete temporary shuffle file at {}", 
outputTempFile.getAbsolutePath());
+      log.warn("Failed to delete temporary shuffle file at {}",
+        MDC.of(LogKeys.PATH$.MODULE$, outputTempFile.getAbsolutePath()));
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 35c5efc77f6f..3506e2a88864 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -25,11 +25,13 @@ import java.util.LinkedList;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.Closeables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.spark.SparkEnv;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
@@ -392,7 +394,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
         // remove the spill file from disk
         File file = spillWriters.removeFirst().getFile();
         if (file != null && file.exists() && !file.delete()) {
-          logger.error("Was unable to delete spill file {}", 
file.getAbsolutePath());
+          logger.error("Was unable to delete spill file {}",
+            MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
         }
       }
     }
@@ -893,7 +896,8 @@ public final class BytesToBytesMap extends MemoryConsumer {
       File file = spillWriters.removeFirst().getFile();
       if (file != null && file.exists()) {
         if (!file.delete()) {
-          logger.error("Was unable to delete spill file {}", 
file.getAbsolutePath());
+          logger.error("Was unable to delete spill file {}",
+            MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
         }
       }
     }
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 2f9e1a9f4546..0be312d48a9d 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -28,11 +28,13 @@ import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.spark.TaskContext;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+import org.apache.spark.internal.MDC;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.SparkOutOfMemoryError;
 import org.apache.spark.memory.TaskMemoryManager;
@@ -217,10 +219,10 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
     }
 
     logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
-      Thread.currentThread().getId(),
-      Utils.bytesToString(getMemoryUsage()),
-      spillWriters.size(),
-      spillWriters.size() > 1 ? " times" : " time");
+      MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()),
+      MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, 
Utils.bytesToString(getMemoryUsage())),
+      MDC.of(LogKeys.NUM_SPILL_WRITERS$.MODULE$, spillWriters.size()),
+      MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spillWriters.size() > 1 ? "times" : 
"time"));
 
     ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 
@@ -335,7 +337,8 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       File file = spill.getFile();
       if (file != null && file.exists()) {
         if (!file.delete()) {
-          logger.error("Was unable to delete spill file {}", 
file.getAbsolutePath());
+          logger.error("Was unable to delete spill file {}",
+            MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath()));
         }
       }
     }
@@ -476,8 +479,8 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
 
     assert(inMemSorter != null);
     if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
-      logger.info("Spilling data because number of spilledRecords crossed the 
threshold " +
-        numElementsForSpillThreshold);
+      logger.info("Spilling data because number of spilledRecords crossed the 
threshold {}",
+        MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, 
numElementsForSpillThreshold));
       spill();
     }
 
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index cf29835b2ce8..4eff6a70acca 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -23,13 +23,13 @@ import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.internal.config.ConfigEntry;
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
 import org.apache.spark.io.NioBufferedFileInputStream;
 import org.apache.spark.io.ReadAheadInputStream;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.unsafe.Platform;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.*;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to