This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3d9d1f3dc05a [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework 3d9d1f3dc05a is described below commit 3d9d1f3dc05a2825bf315c68fc4e4232354dbd00 Author: panbingkun <panbing...@baidu.com> AuthorDate: Tue May 7 13:08:00 2024 -0700 [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to 1.migrate `error/warn/info` in module `core` with variables to `structured logging framework` for java side. 2.convert all dependencies on `org.slf4j.Logger & org.slf4j.LoggerFactory` to `org.apache.spark.internal.Logger & org.apache.spark.internal.LoggerFactory`, in order to completely `prohibit` importing `org.slf4j.Logger & org.slf4j.LoggerFactory` in java code later. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46390 from panbingkun/core_java_sl. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../java/org/apache/spark/internal/Logger.java | 21 +++++++++++++++- .../scala/org/apache/spark/internal/LogKey.scala | 9 +++++++ .../org/apache/spark/io/ReadAheadInputStream.java | 19 ++++++++------- .../org/apache/spark/memory/TaskMemoryManager.java | 28 +++++++++++++++------- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 9 ++++--- .../spark/shuffle/sort/ShuffleExternalSorter.java | 25 ++++++++++--------- .../spark/shuffle/sort/UnsafeShuffleWriter.java | 9 ++++--- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 10 ++++---- .../apache/spark/unsafe/map/BytesToBytesMap.java | 12 ++++++---- .../unsafe/sort/UnsafeExternalSorter.java | 21 +++++++++------- .../unsafe/sort/UnsafeSorterSpillReader.java | 4 ++-- 11 files changed, 113 insertions(+), 54 deletions(-) diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java b/common/utils/src/main/java/org/apache/spark/internal/Logger.java index 2b4dd3bb45bc..d8ab26424bae 100644 --- a/common/utils/src/main/java/org/apache/spark/internal/Logger.java +++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java @@ -34,6 +34,10 @@ public class Logger { this.slf4jLogger = slf4jLogger; } + public boolean isErrorEnabled() { + return slf4jLogger.isErrorEnabled(); + } + public void error(String msg) { slf4jLogger.error(msg); } @@ -58,6 +62,10 @@ public class Logger { } } + public boolean isWarnEnabled() { + return slf4jLogger.isWarnEnabled(); + } + public void warn(String msg) { slf4jLogger.warn(msg); } @@ -82,6 +90,10 @@ public class Logger { } } + public boolean isInfoEnabled() { + return slf4jLogger.isInfoEnabled(); + } + public void info(String msg) { slf4jLogger.info(msg); } @@ -106,6 +118,10 @@ public class Logger { } } + public boolean isDebugEnabled() { + return slf4jLogger.isDebugEnabled(); + } + public void debug(String msg) { slf4jLogger.debug(msg); } @@ -126,6 +142,10 @@ public class Logger { slf4jLogger.debug(msg, throwable); } + public boolean isTraceEnabled() { + return slf4jLogger.isTraceEnabled(); + } + public void trace(String msg) { slf4jLogger.trace(msg); } @@ -146,7 +166,6 @@ public class Logger { slf4jLogger.trace(msg, throwable); } - private void withLogContext( String pattern, MDC[] mdcs, diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index d4e1d9f535af..c127f9c3d1f9 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -168,6 +168,7 @@ object LogKeys { case object EXCEPTION extends LogKey case object EXECUTE_INFO extends LogKey case object EXECUTE_KEY extends LogKey + case object EXECUTION_MEMORY_SIZE extends LogKey case object EXECUTION_PLAN_LEAVES extends LogKey case object EXECUTOR_BACKEND extends LogKey case object EXECUTOR_DESIRED_COUNT extends LogKey @@ -302,6 +303,7 @@ object LogKeys { case object MAX_SLOTS extends LogKey case object MAX_SPLIT_BYTES extends LogKey case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey + case object MEMORY_CONSUMER extends LogKey case object MEMORY_POOL_NAME extends LogKey case object MEMORY_SIZE extends LogKey case object MERGE_DIR_NAME extends LogKey @@ -342,6 +344,7 @@ object LogKeys { case object NUM_CONCURRENT_WRITER extends LogKey case object NUM_CORES extends LogKey case object NUM_DROPPED_PARTITIONS extends LogKey + case object NUM_ELEMENTS_SPILL_THRESHOLD extends LogKey case object NUM_EVENTS extends LogKey case object NUM_EXAMPLES extends LogKey case object NUM_EXECUTOR_CORES extends LogKey @@ -375,6 +378,8 @@ object LogKeys { case object NUM_RIGHT_PARTITION_VALUES extends LogKey case object NUM_SEQUENCES extends LogKey case object NUM_SLOTS extends LogKey + case object NUM_SPILL_INFOS extends LogKey + case object NUM_SPILL_WRITERS extends LogKey case object NUM_TASKS extends LogKey case object NUM_TASK_CPUS extends LogKey case object NUM_VERSIONS_RETAIN extends LogKey @@ -394,6 +399,7 @@ object LogKeys { case object OP_TYPE extends LogKey case object OUTPUT extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey + case object PAGE_SIZE extends LogKey case object PARSE_MODE extends LogKey case object PARTITIONED_FILE_READER extends LogKey case object PARTITIONER extends LogKey @@ -502,6 +508,7 @@ object LogKeys { case object SOCKET_ADDRESS extends LogKey case object SPARK_DATA_STREAM extends LogKey case object SPARK_PLAN_ID extends LogKey + case object SPILL_TIMES extends LogKey case object SQL_TEXT extends LogKey case object SRC_PATH extends LogKey case object STAGE_ATTEMPT extends LogKey @@ -516,6 +523,7 @@ object LogKeys { case object STORAGE_LEVEL extends LogKey case object STORAGE_LEVEL_DESERIALIZED extends LogKey case object STORAGE_LEVEL_REPLICATION extends LogKey + case object STORAGE_MEMORY_SIZE extends LogKey case object STORE_ID extends LogKey case object STREAMING_DATA_SOURCE_DESCRIPTION extends LogKey case object STREAMING_DATA_SOURCE_NAME extends LogKey @@ -543,6 +551,7 @@ object LogKeys { case object TEMP_PATH extends LogKey case object TEST_SIZE extends LogKey case object THREAD extends LogKey + case object THREAD_ID extends LogKey case object THREAD_NAME extends LogKey case object TID extends LogKey case object TIME extends LogKey diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java index 33dfa4422906..6d0bbc89e2b5 100644 --- a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java +++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java @@ -13,13 +13,6 @@ */ package org.apache.spark.io; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import org.apache.spark.util.ThreadUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.concurrent.GuardedBy; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -30,6 +23,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.concurrent.GuardedBy; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; +import org.apache.spark.util.ThreadUtils; /** * {@link InputStream} implementation which asynchronously reads ahead from the underlying input @@ -205,7 +208,7 @@ public class ReadAheadInputStream extends InputStream { try { underlyingInputStream.close(); } catch (IOException e) { - logger.warn(e.getMessage(), e); + logger.warn("{}", e, MDC.of(LogKeys.ERROR$.MODULE$, e.getMessage())); } } } diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 83352611770f..aeabd358144f 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -29,9 +29,11 @@ import java.util.Map; import java.util.TreeMap; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; import org.apache.spark.unsafe.memory.MemoryBlock; import org.apache.spark.util.Utils; @@ -244,10 +246,12 @@ public class TaskMemoryManager { } } catch (ClosedByInterruptException e) { // This called by user to kill a task (e.g: speculative task). - logger.error("error while calling spill() on " + consumerToSpill, e); + logger.error("error while calling spill() on {}", e, + MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill)); throw new RuntimeException(e.getMessage()); } catch (IOException e) { - logger.error("error while calling spill() on " + consumerToSpill, e); + logger.error("error while calling spill() on {}", e, + MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, consumerToSpill)); // checkstyle.off: RegexpSinglelineJava throw new SparkOutOfMemoryError("error while calling spill() on " + consumerToSpill + " : " + e.getMessage()); @@ -270,24 +274,29 @@ public class TaskMemoryManager { * Dump the memory usage of all consumers. */ public void showMemoryUsage() { - logger.info("Memory used in task " + taskAttemptId); + logger.info("Memory used in task {}", + MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId)); synchronized (this) { long memoryAccountedForByConsumers = 0; for (MemoryConsumer c: consumers) { long totalMemUsage = c.getUsed(); memoryAccountedForByConsumers += totalMemUsage; if (totalMemUsage > 0) { - logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage)); + logger.info("Acquired by {}: {}", + MDC.of(LogKeys.MEMORY_CONSUMER$.MODULE$, c), + MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(totalMemUsage))); } } long memoryNotAccountedFor = memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers; logger.info( "{} bytes of memory were used by task {} but are not associated with specific consumers", - memoryNotAccountedFor, taskAttemptId); + MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, memoryNotAccountedFor), + MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskAttemptId)); logger.info( "{} bytes of memory are used for execution and {} bytes of memory are used for storage", - memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed()); + MDC.of(LogKeys.EXECUTION_MEMORY_SIZE$.MODULE$, memoryManager.executionMemoryUsed()), + MDC.of(LogKeys.STORAGE_MEMORY_SIZE$.MODULE$, memoryManager.storageMemoryUsed())); } } @@ -333,7 +342,8 @@ public class TaskMemoryManager { try { page = memoryManager.tungstenMemoryAllocator().allocate(acquired); } catch (OutOfMemoryError e) { - logger.warn("Failed to allocate a page ({} bytes), try again.", acquired); + logger.warn("Failed to allocate a page ({} bytes), try again.", + MDC.of(LogKeys.PAGE_SIZE$.MODULE$, acquired)); // there is no enough memory actually, it means the actual free memory is smaller than // MemoryManager thought, we should keep the acquired memory. synchronized (this) { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index d067c870acc9..284d1dd036b4 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -33,9 +33,11 @@ import scala.Tuple2; import scala.collection.Iterator; import com.google.common.io.Closeables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; import org.apache.spark.Partitioner; import org.apache.spark.ShuffleDependency; import org.apache.spark.SparkConf; @@ -223,7 +225,8 @@ final class BypassMergeSortShuffleWriter<K, V> writePartitionedDataWithStream(file, writer); } if (!file.delete()) { - logger.error("Unable to delete file for partition {}", i); + logger.error("Unable to delete file for partition {}", + MDC.of(LogKeys.PARTITION_ID$.MODULE$, i)); } } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java index b097089282ce..8fe432cfe239 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java @@ -23,17 +23,19 @@ import java.io.IOException; import java.util.LinkedList; import java.util.zip.Checksum; -import org.apache.spark.SparkException; import scala.Tuple2; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.spark.SparkConf; +import org.apache.spark.SparkException; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.internal.config.package$; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; @@ -159,11 +161,11 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck if (!isFinalFile) { logger.info( "Task {} on Thread {} spilling sort data of {} to disk ({} {} so far)", - taskContext.taskAttemptId(), - Thread.currentThread().getId(), - Utils.bytesToString(getMemoryUsage()), - spills.size(), - spills.size() != 1 ? " times" : " time"); + MDC.of(LogKeys.TASK_ATTEMPT_ID$.MODULE$, taskContext.taskAttemptId()), + MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()), + MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(getMemoryUsage())), + MDC.of(LogKeys.NUM_SPILL_INFOS$.MODULE$, spills.size()), + MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spills.size() != 1 ? "times" : "time")); } // This call performs the actual sort. @@ -349,7 +351,8 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck } for (SpillInfo spill : spills) { if (spill.file.exists() && !spill.file.delete()) { - logger.error("Unable to delete spill file {}", spill.file.getPath()); + logger.error("Unable to delete spill file {}", + MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath())); } } } @@ -416,8 +419,8 @@ final class ShuffleExternalSorter extends MemoryConsumer implements ShuffleCheck // for tests assert(inMemSorter != null); if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { - logger.info("Spilling data because number of spilledRecords crossed the threshold " + - numElementsForSpillThreshold); + logger.info("Spilling data because number of spilledRecords crossed the threshold {}" + + MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold)); spill(); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index f5949d6ae7a5..6da9d3def3f8 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -35,12 +35,14 @@ import scala.reflect.ClassTag$; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.spark.*; import org.apache.spark.annotation.Private; import org.apache.spark.internal.config.package$; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; import org.apache.spark.io.CompressionCodec; import org.apache.spark.io.CompressionCodec$; import org.apache.spark.io.NioBufferedFileInputStream; @@ -226,7 +228,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { sorter = null; for (SpillInfo spill : spills) { if (spill.file.exists() && !spill.file.delete()) { - logger.error("Error while deleting spill file {}", spill.file.getPath()); + logger.error("Error while deleting spill file {}", + MDC.of(LogKeys.PATH$.MODULE$, spill.file.getPath())); } } } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index efe508d1361c..fbf4abc160b6 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -26,10 +26,11 @@ import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.util.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.spark.SparkConf; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; import org.apache.spark.shuffle.api.ShuffleMapOutputWriter; import org.apache.spark.shuffle.api.ShufflePartitionWriter; import org.apache.spark.shuffle.api.WritableByteChannelWrapper; @@ -123,7 +124,8 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter { public void abort(Throwable error) throws IOException { cleanUp(); if (outputTempFile != null && outputTempFile.exists() && !outputTempFile.delete()) { - log.warn("Failed to delete temporary shuffle file at {}", outputTempFile.getAbsolutePath()); + log.warn("Failed to delete temporary shuffle file at {}", + MDC.of(LogKeys.PATH$.MODULE$, outputTempFile.getAbsolutePath())); } } diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 35c5efc77f6f..3506e2a88864 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -25,11 +25,13 @@ import java.util.LinkedList; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Closeables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.spark.SparkEnv; import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.MDC; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; @@ -392,7 +394,8 @@ public final class BytesToBytesMap extends MemoryConsumer { // remove the spill file from disk File file = spillWriters.removeFirst().getFile(); if (file != null && file.exists() && !file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + logger.error("Was unable to delete spill file {}", + MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath())); } } } @@ -893,7 +896,8 @@ public final class BytesToBytesMap extends MemoryConsumer { File file = spillWriters.removeFirst().getFile(); if (file != null && file.exists()) { if (!file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + logger.error("Was unable to delete spill file {}", + MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath())); } } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 2f9e1a9f4546..0be312d48a9d 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -28,11 +28,13 @@ import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; +import org.apache.spark.internal.MDC; import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.SparkOutOfMemoryError; import org.apache.spark.memory.TaskMemoryManager; @@ -217,10 +219,10 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", - Thread.currentThread().getId(), - Utils.bytesToString(getMemoryUsage()), - spillWriters.size(), - spillWriters.size() > 1 ? " times" : " time"); + MDC.of(LogKeys.THREAD_ID$.MODULE$, Thread.currentThread().getId()), + MDC.of(LogKeys.MEMORY_SIZE$.MODULE$, Utils.bytesToString(getMemoryUsage())), + MDC.of(LogKeys.NUM_SPILL_WRITERS$.MODULE$, spillWriters.size()), + MDC.of(LogKeys.SPILL_TIMES$.MODULE$, spillWriters.size() > 1 ? "times" : "time")); ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics(); @@ -335,7 +337,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer { File file = spill.getFile(); if (file != null && file.exists()) { if (!file.delete()) { - logger.error("Was unable to delete spill file {}", file.getAbsolutePath()); + logger.error("Was unable to delete spill file {}", + MDC.of(LogKeys.PATH$.MODULE$, file.getAbsolutePath())); } } } @@ -476,8 +479,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer { assert(inMemSorter != null); if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { - logger.info("Spilling data because number of spilledRecords crossed the threshold " + - numElementsForSpillThreshold); + logger.info("Spilling data because number of spilledRecords crossed the threshold {}", + MDC.of(LogKeys.NUM_ELEMENTS_SPILL_THRESHOLD$.MODULE$, numElementsForSpillThreshold)); spill(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index cf29835b2ce8..4eff6a70acca 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -23,13 +23,13 @@ import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.internal.config.package$; import org.apache.spark.internal.config.ConfigEntry; +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; import org.apache.spark.io.NioBufferedFileInputStream; import org.apache.spark.io.ReadAheadInputStream; import org.apache.spark.serializer.SerializerManager; import org.apache.spark.storage.BlockId; import org.apache.spark.unsafe.Platform; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.*; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org