This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c01f196afc3 [SPARK-48059][CORE] Implement the structured log 
framework on the java side
5c01f196afc3 is described below

commit 5c01f196afc3ba75f10c4aedf2c8405b6f59336a
Author: panbingkun <panbing...@baidu.com>
AuthorDate: Fri May 3 16:30:36 2024 -0700

    [SPARK-48059][CORE] Implement the structured log framework on the java side
    
    ### What changes were proposed in this pull request?
    The pr aims to implement the structured log framework on the `java side`.
    
    ### Why are the changes needed?
    Currently, the structured log framework on the `scala side` is basically 
available, but the`Spark Core` code also includes some `Java code`, which also 
needs to be connected to the structured log framework.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, only for developers.
    
    ### How was this patch tested?
    - Add some new UT.
    - Pass GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46301 from panbingkun/structured_logger_java.
    
    Authored-by: panbingkun <panbing...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../java/org/apache/spark/internal/Logger.java     | 184 +++++++++++++++
 .../org/apache/spark/internal/LoggerFactory.java   |  26 +++
 .../scala/org/apache/spark/internal/Logging.scala  |   4 +
 .../org/apache/spark/util/LoggerSuiteBase.java     | 248 +++++++++++++++++++++
 .../org/apache/spark/util/PatternLoggerSuite.java  |  89 ++++++++
 .../apache/spark/util/StructuredLoggerSuite.java   | 164 ++++++++++++++
 common/utils/src/test/resources/log4j2.properties  |  28 ++-
 .../apache/spark/util/StructuredLoggingSuite.scala |   8 +-
 8 files changed, 739 insertions(+), 12 deletions(-)

diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java 
b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
new file mode 100644
index 000000000000..f252f44b3b76
--- /dev/null
+++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.CloseableThreadContext;
+import org.apache.logging.log4j.message.MessageFactory;
+import org.apache.logging.log4j.message.ParameterizedMessageFactory;
+
+public class Logger {
+
+  private static final MessageFactory MESSAGE_FACTORY = 
ParameterizedMessageFactory.INSTANCE;
+  private final org.slf4j.Logger slf4jLogger;
+
+  Logger(org.slf4j.Logger slf4jLogger) {
+    this.slf4jLogger = slf4jLogger;
+  }
+
+  public void error(String msg) {
+    slf4jLogger.error(msg);
+  }
+
+  public void error(String msg, Throwable throwable) {
+    slf4jLogger.error(msg, throwable);
+  }
+
+  public void error(String msg, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.error(msg);
+    } else if (slf4jLogger.isErrorEnabled()) {
+      withLogContext(msg, mdcs, null, mt -> slf4jLogger.error(mt.message));
+    }
+  }
+
+  public void error(String msg, Throwable throwable, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.error(msg, throwable);
+    } else if (slf4jLogger.isErrorEnabled()) {
+      withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.error(mt.message, 
mt.throwable));
+    }
+  }
+
+  public void warn(String msg) {
+    slf4jLogger.warn(msg);
+  }
+
+  public void warn(String msg, Throwable throwable) {
+    slf4jLogger.warn(msg, throwable);
+  }
+
+  public void warn(String msg, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.warn(msg);
+    } else if (slf4jLogger.isWarnEnabled()) {
+      withLogContext(msg, mdcs, null, mt -> slf4jLogger.warn(mt.message));
+    }
+  }
+
+  public void warn(String msg, Throwable throwable, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.warn(msg);
+    } else if (slf4jLogger.isWarnEnabled()) {
+      withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.warn(mt.message, 
mt.throwable));
+    }
+  }
+
+  public void info(String msg) {
+    slf4jLogger.info(msg);
+  }
+
+  public void info(String msg, Throwable throwable) {
+    slf4jLogger.info(msg, throwable);
+  }
+
+  public void info(String msg, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.info(msg);
+    } else if (slf4jLogger.isInfoEnabled()) {
+      withLogContext(msg, mdcs, null, mt -> slf4jLogger.info(mt.message));
+    }
+  }
+
+  public void info(String msg, Throwable throwable, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.info(msg);
+    } else if (slf4jLogger.isInfoEnabled()) {
+      withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.info(mt.message, 
mt.throwable));
+    }
+  }
+
+  public void debug(String msg) {
+    slf4jLogger.debug(msg);
+  }
+
+  public void debug(String msg, Throwable throwable) {
+    slf4jLogger.debug(msg, throwable);
+  }
+
+  public void debug(String msg, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.debug(msg);
+    } else if (slf4jLogger.isDebugEnabled()) {
+      withLogContext(msg, mdcs, null, mt -> slf4jLogger.debug(mt.message));
+    }
+  }
+
+  public void debug(String msg, Throwable throwable, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.debug(msg);
+    } else if (slf4jLogger.isDebugEnabled()) {
+      withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.debug(mt.message, 
mt.throwable));
+    }
+  }
+
+  public void trace(String msg) {
+    slf4jLogger.trace(msg);
+  }
+
+  public void trace(String msg, Throwable throwable) {
+    slf4jLogger.trace(msg, throwable);
+  }
+
+  public void trace(String msg, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.trace(msg);
+    } else if (slf4jLogger.isTraceEnabled()) {
+      withLogContext(msg, mdcs, null, mt -> slf4jLogger.trace(mt.message));
+    }
+  }
+
+  public void trace(String msg, Throwable throwable, MDC... mdcs) {
+    if (mdcs == null || mdcs.length == 0) {
+      slf4jLogger.trace(msg);
+    } else if (slf4jLogger.isTraceEnabled()) {
+      withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.trace(mt.message, 
mt.throwable));
+    }
+  }
+
+  private void withLogContext(
+      String pattern,
+      MDC[] mdcs,
+      Throwable throwable,
+      Consumer<MessageThrowable> func) {
+    Map<String, String> context = new HashMap<>();
+    Object[] args = new Object[mdcs.length];
+    for (int index = 0; index < mdcs.length; index++) {
+      MDC mdc = mdcs[index];
+      String value = (mdc.value() != null) ? mdc.value().toString() : null;
+      if (Logging$.MODULE$.isStructuredLoggingEnabled()) {
+        context.put(mdc.key().name(), value);
+      }
+      args[index] = value;
+    }
+    MessageThrowable messageThrowable = MessageThrowable.of(
+        MESSAGE_FACTORY.newMessage(pattern, args).getFormattedMessage(), 
throwable);
+    try (CloseableThreadContext.Instance ignored = 
CloseableThreadContext.putAll(context)) {
+      func.accept(messageThrowable);
+    }
+  }
+
+  private record MessageThrowable(String message, Throwable throwable) {
+    static MessageThrowable of(String message, Throwable throwable) {
+      return new MessageThrowable(message, throwable);
+    }
+  }
+}
diff --git 
a/common/utils/src/main/java/org/apache/spark/internal/LoggerFactory.java 
b/common/utils/src/main/java/org/apache/spark/internal/LoggerFactory.java
new file mode 100644
index 000000000000..34141fd55a4a
--- /dev/null
+++ b/common/utils/src/main/java/org/apache/spark/internal/LoggerFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal;
+
+public class LoggerFactory {
+
+  public static Logger getLogger(Class<?> clazz) {
+    org.slf4j.Logger slf4jLogger = org.slf4j.LoggerFactory.getLogger(clazz);
+    return new Logger(slf4jLogger);
+  }
+}
diff --git 
a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
index 24a60f88c24a..2ea61358b6ad 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala
@@ -39,6 +39,10 @@ case class MDC(key: LogKey, value: Any) {
     "the class of value cannot be MessageWithContext")
 }
 
+object MDC {
+  def of(key: LogKey, value: Any): MDC = MDC(key, value)
+}
+
 /**
  * Wrapper class for log messages that include a logging context.
  * This is used as the return type of the string interpolator 
`LogStringContext`.
diff --git 
a/common/utils/src/test/java/org/apache/spark/util/LoggerSuiteBase.java 
b/common/utils/src/test/java/org/apache/spark/util/LoggerSuiteBase.java
new file mode 100644
index 000000000000..cdc06f6fc261
--- /dev/null
+++ b/common/utils/src/test/java/org/apache/spark/util/LoggerSuiteBase.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.logging.log4j.Level;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LogKeys;
+import org.apache.spark.internal.MDC;
+
+public abstract class LoggerSuiteBase {
+
+  abstract Logger logger();
+  abstract String className();
+  abstract String logFilePath();
+
+  private File logFile() throws IOException {
+    String pwd = new File(".").getCanonicalPath();
+    return new File(pwd + File.separator + logFilePath());
+  }
+
+  // Return the newly added log contents in the log file after executing the 
function `f`
+  private String captureLogOutput(Runnable func) throws IOException {
+    String content = "";
+    if (logFile().exists()) {
+      content = Files.readString(logFile().toPath());
+    }
+    func.run();
+    String newContent = Files.readString(logFile().toPath());
+    return newContent.substring(content.length());
+  }
+
+  private String basicMsg() {
+    return "This is a log message";
+  }
+
+  private final MDC executorIDMDC = MDC.of(LogKeys.EXECUTOR_ID$.MODULE$, "1");
+  private final String msgWithMDC = "Lost executor {}.";
+
+  private final MDC[] mdcs = new MDC[] {
+      MDC.of(LogKeys.EXECUTOR_ID$.MODULE$, "1"),
+      MDC.of(LogKeys.REASON$.MODULE$, "the shuffle data is too large")};
+  private final String msgWithMDCs = "Lost executor {}, reason: {}";
+
+  private final MDC executorIDMDCValueIsNull = 
MDC.of(LogKeys.EXECUTOR_ID$.MODULE$, null);
+
+  private final MDC externalSystemCustomLog =
+    MDC.of(CustomLogKeys.CUSTOM_LOG_KEY$.MODULE$, "External system custom log 
message.");
+
+  // test for basic message (without any mdc)
+  abstract String expectedPatternForBasicMsg(Level level);
+
+  // test for basic message and exception
+  abstract String expectedPatternForBasicMsgWithException(Level level);
+
+  // test for message (with mdc)
+  abstract String expectedPatternForMsgWithMDC(Level level);
+
+  // test for message (with mdcs)
+  abstract String expectedPatternForMsgWithMDCs(Level level);
+
+  // test for message (with mdcs and exception)
+  abstract String expectedPatternForMsgWithMDCsAndException(Level level);
+
+  // test for message (with mdc - the value is null)
+  abstract String expectedPatternForMsgWithMDCValueIsNull(Level level);
+
+  // test for external system custom LogKey
+  abstract String expectedPatternForExternalSystemCustomLogKey(Level level);
+
+  @Test
+  public void testBasicMsgLogger() {
+    Runnable errorFn = () -> logger().error(basicMsg());
+    Runnable warnFn = () -> logger().warn(basicMsg());
+    Runnable infoFn = () -> logger().info(basicMsg());
+    Runnable debugFn = () -> logger().debug(basicMsg());
+    Runnable traceFn = () -> logger().trace(basicMsg());
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForBasicMsg(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testBasicLoggerWithException() {
+    Throwable exception = new RuntimeException("OOM");
+    Runnable errorFn = () -> logger().error(basicMsg(), exception);
+    Runnable warnFn = () -> logger().warn(basicMsg(), exception);
+    Runnable infoFn = () -> logger().info(basicMsg(), exception);
+    Runnable debugFn = () -> logger().debug(basicMsg(), exception);
+    Runnable traceFn = () -> logger().trace(basicMsg(), exception);
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForBasicMsgWithException(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testLoggerWithMDC() {
+    Runnable errorFn = () -> logger().error(msgWithMDC, executorIDMDC);
+    Runnable warnFn = () -> logger().warn(msgWithMDC, executorIDMDC);
+    Runnable infoFn = () -> logger().info(msgWithMDC, executorIDMDC);
+    Runnable debugFn = () -> logger().debug(msgWithMDC, executorIDMDC);
+    Runnable traceFn = () -> logger().trace(msgWithMDC, executorIDMDC);
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForMsgWithMDC(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testLoggerWithMDCs() {
+    Runnable errorFn = () -> logger().error(msgWithMDCs, mdcs);
+    Runnable warnFn = () -> logger().warn(msgWithMDCs, mdcs);
+    Runnable infoFn = () -> logger().info(msgWithMDCs, mdcs);
+    Runnable debugFn = () -> logger().debug(msgWithMDCs, mdcs);
+    Runnable traceFn = () -> logger().trace(msgWithMDCs, mdcs);
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForMsgWithMDCs(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testLoggerWithMDCsAndException() {
+    Throwable exception = new RuntimeException("OOM");
+    Runnable errorFn = () -> logger().error(msgWithMDCs, exception, mdcs);
+    Runnable warnFn = () -> logger().warn(msgWithMDCs, exception, mdcs);
+    Runnable infoFn = () -> logger().info(msgWithMDCs, exception, mdcs);
+    Runnable debugFn = () -> logger().debug(msgWithMDCs, exception, mdcs);
+    Runnable traceFn = () -> logger().trace(msgWithMDCs, exception, mdcs);
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForMsgWithMDCsAndException(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testLoggerWithMDCValueIsNull() {
+    Runnable errorFn = () -> logger().error(msgWithMDC, 
executorIDMDCValueIsNull);
+    Runnable warnFn = () -> logger().warn(msgWithMDC, 
executorIDMDCValueIsNull);
+    Runnable infoFn = () -> logger().info(msgWithMDC, 
executorIDMDCValueIsNull);
+    Runnable debugFn = () -> logger().debug(msgWithMDC, 
executorIDMDCValueIsNull);
+    Runnable traceFn = () -> logger().trace(msgWithMDC, 
executorIDMDCValueIsNull);
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForMsgWithMDCValueIsNull(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  @Test
+  public void testLoggerWithExternalSystemCustomLogKey() {
+    Runnable errorFn = () -> logger().error("{}", externalSystemCustomLog);
+    Runnable warnFn = () -> logger().warn("{}", externalSystemCustomLog);
+    Runnable infoFn = () -> logger().info("{}", externalSystemCustomLog);
+    Runnable debugFn = () -> logger().debug("{}", externalSystemCustomLog);
+    Runnable traceFn = () -> logger().trace("{}", externalSystemCustomLog);
+    List.of(
+        Pair.of(Level.ERROR, errorFn),
+        Pair.of(Level.WARN, warnFn),
+        Pair.of(Level.INFO, infoFn),
+        Pair.of(Level.DEBUG, debugFn),
+        Pair.of(Level.TRACE, traceFn)).forEach(pair -> {
+      try {
+        assert (captureLogOutput(pair.getRight()).matches(
+            expectedPatternForExternalSystemCustomLogKey(pair.getLeft())));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}
diff --git 
a/common/utils/src/test/java/org/apache/spark/util/PatternLoggerSuite.java 
b/common/utils/src/test/java/org/apache/spark/util/PatternLoggerSuite.java
new file mode 100644
index 000000000000..13b6a1d05470
--- /dev/null
+++ b/common/utils/src/test/java/org/apache/spark/util/PatternLoggerSuite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import org.apache.logging.log4j.Level;
+
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+
+public class PatternLoggerSuite extends LoggerSuiteBase {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PatternLoggerSuite.class);
+
+  private String toRegexPattern(Level level, String msg) {
+    return msg
+        .replace("<level>", level.toString())
+        .replace("<className>", className());
+  }
+
+  @Override
+  Logger logger() {
+    return LOGGER;
+  }
+
+  @Override
+  String className() {
+    return PatternLoggerSuite.class.getSimpleName();
+  }
+
+  @Override
+  String logFilePath() {
+    return "target/pattern.log";
+  }
+
+  @Override
+  String expectedPatternForBasicMsg(Level level) {
+    return toRegexPattern(level, ".*<level> <className>: This is a log 
message\n");
+  }
+
+  @Override
+  String expectedPatternForBasicMsgWithException(Level level) {
+    return toRegexPattern(level, """
+        .*<level> <className>: This is a log message
+        [\\s\\S]*""");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDC(Level level) {
+    return toRegexPattern(level, ".*<level> <className>: Lost executor 1.\n");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDCs(Level level) {
+    return toRegexPattern(level,
+      ".*<level> <className>: Lost executor 1, reason: the shuffle data is too 
large\n");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDCsAndException(Level level) {
+    return toRegexPattern(level,"""
+      .*<level> <className>: Lost executor 1, reason: the shuffle data is too 
large
+      [\\s\\S]*""");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDCValueIsNull(Level level) {
+    return toRegexPattern(level, ".*<level> <className>: Lost executor 
null.\n");
+  }
+
+  @Override
+  String expectedPatternForExternalSystemCustomLogKey(Level level) {
+    return toRegexPattern(level, ".*<level> <className>: External system 
custom log message.\n");
+  }
+}
diff --git 
a/common/utils/src/test/java/org/apache/spark/util/StructuredLoggerSuite.java 
b/common/utils/src/test/java/org/apache/spark/util/StructuredLoggerSuite.java
new file mode 100644
index 000000000000..c1b31bf68a7d
--- /dev/null
+++ 
b/common/utils/src/test/java/org/apache/spark/util/StructuredLoggerSuite.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.logging.log4j.Level;
+
+import org.apache.spark.internal.Logger;
+import org.apache.spark.internal.LoggerFactory;
+
+public class StructuredLoggerSuite extends LoggerSuiteBase {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StructuredLoggerSuite.class);
+
+  private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+  private String compactAndToRegexPattern(Level level, String json) {
+    try {
+      return JSON_MAPPER.readTree(json).toString()
+         .replace("<level>", level.toString())
+         .replace("<className>", className())
+         .replace("<timestamp>", "[^\"]+")
+         .replace("\"<stacktrace>\"", ".*")
+         .replace("{", "\\{") + "\n";
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  Logger logger() {
+    return LOGGER;
+  }
+
+  @Override
+  String className() {
+    return StructuredLoggerSuite.class.getSimpleName();
+  }
+
+  @Override
+  String logFilePath() {
+    return "target/structured.log";
+  }
+
+  @Override
+  String expectedPatternForBasicMsg(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "This is a log message",
+        "logger": "<className>"
+      }""");
+  }
+
+  @Override
+  String expectedPatternForBasicMsgWithException(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "This is a log message",
+        "exception": {
+          "class": "java.lang.RuntimeException",
+          "msg": "OOM",
+          "stacktrace": "<stacktrace>"
+        },
+        "logger": "<className>"
+      }""");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDC(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "Lost executor 1.",
+        "context": {
+          "executor_id": "1"
+        },
+        "logger": "<className>"
+      }""");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDCs(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "Lost executor 1, reason: the shuffle data is too large",
+        "context": {
+          "executor_id": "1",
+          "reason": "the shuffle data is too large"
+        },
+        "logger": "<className>"
+      }""");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDCsAndException(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "Lost executor 1, reason: the shuffle data is too large",
+        "context": {
+          "executor_id": "1",
+          "reason": "the shuffle data is too large"
+        },
+        "exception": {
+          "class": "java.lang.RuntimeException",
+          "msg": "OOM",
+          "stacktrace": "<stacktrace>"
+        },
+        "logger": "<className>"
+      }""");
+  }
+
+  @Override
+  String expectedPatternForMsgWithMDCValueIsNull(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "Lost executor null.",
+        "context": {
+          "executor_id": null
+        },
+        "logger": "<className>"
+      }""");
+  }
+
+  @Override
+  String expectedPatternForExternalSystemCustomLogKey(Level level) {
+    return compactAndToRegexPattern(level, """
+      {
+        "ts": "<timestamp>",
+        "level": "<level>",
+        "msg": "External system custom log message.",
+        "context": {
+          "custom_log_key": "External system custom log message."
+        },
+        "logger": "<className>"
+      }""");
+  }
+}
+
diff --git a/common/utils/src/test/resources/log4j2.properties 
b/common/utils/src/test/resources/log4j2.properties
index e3bd8689993d..3e6dc42d3d5d 100644
--- a/common/utils/src/test/resources/log4j2.properties
+++ b/common/utils/src/test/resources/log4j2.properties
@@ -39,12 +39,22 @@ appender.pattern.layout.type = PatternLayout
 appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
 
 # Custom loggers
-logger.structured.name = org.apache.spark.util.StructuredLoggingSuite
-logger.structured.level = trace
-logger.structured.appenderRefs = structured
-logger.structured.appenderRef.structured.ref = structured
-
-logger.pattern.name = org.apache.spark.util.PatternLoggingSuite
-logger.pattern.level = trace
-logger.pattern.appenderRefs = pattern
-logger.pattern.appenderRef.pattern.ref = pattern
+logger.structured_logging.name = org.apache.spark.util.StructuredLoggingSuite
+logger.structured_logging.level = trace
+logger.structured_logging.appenderRefs = structured
+logger.structured_logging.appenderRef.structured.ref = structured
+
+logger.pattern_logging.name = org.apache.spark.util.PatternLoggingSuite
+logger.pattern_logging.level = trace
+logger.pattern_logging.appenderRefs = pattern
+logger.pattern_logging.appenderRef.pattern.ref = pattern
+
+logger.structured_logger.name = org.apache.spark.util.StructuredLoggerSuite
+logger.structured_logger.level = trace
+logger.structured_logger.appenderRefs = structured
+logger.structured_logger.appenderRef.structured.ref = structured
+
+logger.pattern_logger.name = org.apache.spark.util.PatternLoggerSuite
+logger.pattern_logger.level = trace
+logger.pattern_logger.appenderRefs = pattern
+logger.pattern_logger.appenderRef.pattern.ref = pattern
diff --git 
a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
 
b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
index 33a701aaed0f..2152b57524d7 100644
--- 
a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
+++ 
b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala
@@ -147,7 +147,7 @@ trait LoggingSuiteBase
   }
 
   private val externalSystemCustomLog =
-    log"${MDC(CUSTOM_LOG_KEY, "External system custom log message.")}"
+    log"${MDC(CustomLogKeys.CUSTOM_LOG_KEY, "External system custom log 
message.")}"
   test("Logging with external system custom LogKey") {
     Seq(
       (Level.ERROR, () => logError(externalSystemCustomLog)),
@@ -306,5 +306,7 @@ class StructuredLoggingSuite extends LoggingSuiteBase {
   }
 }
 
-// External system custom LogKey must be `extends LogKey`
-case object CUSTOM_LOG_KEY extends LogKey
+object CustomLogKeys {
+  // External system custom LogKey must be `extends LogKey`
+  case object CUSTOM_LOG_KEY extends LogKey
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to