This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5c01f196afc3 [SPARK-48059][CORE] Implement the structured log framework on the java side 5c01f196afc3 is described below commit 5c01f196afc3ba75f10c4aedf2c8405b6f59336a Author: panbingkun <panbing...@baidu.com> AuthorDate: Fri May 3 16:30:36 2024 -0700 [SPARK-48059][CORE] Implement the structured log framework on the java side ### What changes were proposed in this pull request? The pr aims to implement the structured log framework on the `java side`. ### Why are the changes needed? Currently, the structured log framework on the `scala side` is basically available, but the`Spark Core` code also includes some `Java code`, which also needs to be connected to the structured log framework. ### Does this PR introduce _any_ user-facing change? Yes, only for developers. ### How was this patch tested? - Add some new UT. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46301 from panbingkun/structured_logger_java. Authored-by: panbingkun <panbing...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../java/org/apache/spark/internal/Logger.java | 184 +++++++++++++++ .../org/apache/spark/internal/LoggerFactory.java | 26 +++ .../scala/org/apache/spark/internal/Logging.scala | 4 + .../org/apache/spark/util/LoggerSuiteBase.java | 248 +++++++++++++++++++++ .../org/apache/spark/util/PatternLoggerSuite.java | 89 ++++++++ .../apache/spark/util/StructuredLoggerSuite.java | 164 ++++++++++++++ common/utils/src/test/resources/log4j2.properties | 28 ++- .../apache/spark/util/StructuredLoggingSuite.scala | 8 +- 8 files changed, 739 insertions(+), 12 deletions(-) diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java b/common/utils/src/main/java/org/apache/spark/internal/Logger.java new file mode 100644 index 000000000000..f252f44b3b76 --- /dev/null +++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import org.apache.logging.log4j.CloseableThreadContext; +import org.apache.logging.log4j.message.MessageFactory; +import org.apache.logging.log4j.message.ParameterizedMessageFactory; + +public class Logger { + + private static final MessageFactory MESSAGE_FACTORY = ParameterizedMessageFactory.INSTANCE; + private final org.slf4j.Logger slf4jLogger; + + Logger(org.slf4j.Logger slf4jLogger) { + this.slf4jLogger = slf4jLogger; + } + + public void error(String msg) { + slf4jLogger.error(msg); + } + + public void error(String msg, Throwable throwable) { + slf4jLogger.error(msg, throwable); + } + + public void error(String msg, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.error(msg); + } else if (slf4jLogger.isErrorEnabled()) { + withLogContext(msg, mdcs, null, mt -> slf4jLogger.error(mt.message)); + } + } + + public void error(String msg, Throwable throwable, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.error(msg, throwable); + } else if (slf4jLogger.isErrorEnabled()) { + withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.error(mt.message, mt.throwable)); + } + } + + public void warn(String msg) { + slf4jLogger.warn(msg); + } + + public void warn(String msg, Throwable throwable) { + slf4jLogger.warn(msg, throwable); + } + + public void warn(String msg, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.warn(msg); + } else if (slf4jLogger.isWarnEnabled()) { + withLogContext(msg, mdcs, null, mt -> slf4jLogger.warn(mt.message)); + } + } + + public void warn(String msg, Throwable throwable, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.warn(msg); + } else if (slf4jLogger.isWarnEnabled()) { + withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.warn(mt.message, mt.throwable)); + } + } + + public void info(String msg) { + slf4jLogger.info(msg); + } + + public void info(String msg, Throwable throwable) { + slf4jLogger.info(msg, throwable); + } + + public void info(String msg, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.info(msg); + } else if (slf4jLogger.isInfoEnabled()) { + withLogContext(msg, mdcs, null, mt -> slf4jLogger.info(mt.message)); + } + } + + public void info(String msg, Throwable throwable, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.info(msg); + } else if (slf4jLogger.isInfoEnabled()) { + withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.info(mt.message, mt.throwable)); + } + } + + public void debug(String msg) { + slf4jLogger.debug(msg); + } + + public void debug(String msg, Throwable throwable) { + slf4jLogger.debug(msg, throwable); + } + + public void debug(String msg, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.debug(msg); + } else if (slf4jLogger.isDebugEnabled()) { + withLogContext(msg, mdcs, null, mt -> slf4jLogger.debug(mt.message)); + } + } + + public void debug(String msg, Throwable throwable, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.debug(msg); + } else if (slf4jLogger.isDebugEnabled()) { + withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.debug(mt.message, mt.throwable)); + } + } + + public void trace(String msg) { + slf4jLogger.trace(msg); + } + + public void trace(String msg, Throwable throwable) { + slf4jLogger.trace(msg, throwable); + } + + public void trace(String msg, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.trace(msg); + } else if (slf4jLogger.isTraceEnabled()) { + withLogContext(msg, mdcs, null, mt -> slf4jLogger.trace(mt.message)); + } + } + + public void trace(String msg, Throwable throwable, MDC... mdcs) { + if (mdcs == null || mdcs.length == 0) { + slf4jLogger.trace(msg); + } else if (slf4jLogger.isTraceEnabled()) { + withLogContext(msg, mdcs, throwable, mt -> slf4jLogger.trace(mt.message, mt.throwable)); + } + } + + private void withLogContext( + String pattern, + MDC[] mdcs, + Throwable throwable, + Consumer<MessageThrowable> func) { + Map<String, String> context = new HashMap<>(); + Object[] args = new Object[mdcs.length]; + for (int index = 0; index < mdcs.length; index++) { + MDC mdc = mdcs[index]; + String value = (mdc.value() != null) ? mdc.value().toString() : null; + if (Logging$.MODULE$.isStructuredLoggingEnabled()) { + context.put(mdc.key().name(), value); + } + args[index] = value; + } + MessageThrowable messageThrowable = MessageThrowable.of( + MESSAGE_FACTORY.newMessage(pattern, args).getFormattedMessage(), throwable); + try (CloseableThreadContext.Instance ignored = CloseableThreadContext.putAll(context)) { + func.accept(messageThrowable); + } + } + + private record MessageThrowable(String message, Throwable throwable) { + static MessageThrowable of(String message, Throwable throwable) { + return new MessageThrowable(message, throwable); + } + } +} diff --git a/common/utils/src/main/java/org/apache/spark/internal/LoggerFactory.java b/common/utils/src/main/java/org/apache/spark/internal/LoggerFactory.java new file mode 100644 index 000000000000..34141fd55a4a --- /dev/null +++ b/common/utils/src/main/java/org/apache/spark/internal/LoggerFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal; + +public class LoggerFactory { + + public static Logger getLogger(Class<?> clazz) { + org.slf4j.Logger slf4jLogger = org.slf4j.LoggerFactory.getLogger(clazz); + return new Logger(slf4jLogger); + } +} diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 24a60f88c24a..2ea61358b6ad 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -39,6 +39,10 @@ case class MDC(key: LogKey, value: Any) { "the class of value cannot be MessageWithContext") } +object MDC { + def of(key: LogKey, value: Any): MDC = MDC(key, value) +} + /** * Wrapper class for log messages that include a logging context. * This is used as the return type of the string interpolator `LogStringContext`. diff --git a/common/utils/src/test/java/org/apache/spark/util/LoggerSuiteBase.java b/common/utils/src/test/java/org/apache/spark/util/LoggerSuiteBase.java new file mode 100644 index 000000000000..cdc06f6fc261 --- /dev/null +++ b/common/utils/src/test/java/org/apache/spark/util/LoggerSuiteBase.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.logging.log4j.Level; +import org.junit.jupiter.api.Test; + +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LogKeys; +import org.apache.spark.internal.MDC; + +public abstract class LoggerSuiteBase { + + abstract Logger logger(); + abstract String className(); + abstract String logFilePath(); + + private File logFile() throws IOException { + String pwd = new File(".").getCanonicalPath(); + return new File(pwd + File.separator + logFilePath()); + } + + // Return the newly added log contents in the log file after executing the function `f` + private String captureLogOutput(Runnable func) throws IOException { + String content = ""; + if (logFile().exists()) { + content = Files.readString(logFile().toPath()); + } + func.run(); + String newContent = Files.readString(logFile().toPath()); + return newContent.substring(content.length()); + } + + private String basicMsg() { + return "This is a log message"; + } + + private final MDC executorIDMDC = MDC.of(LogKeys.EXECUTOR_ID$.MODULE$, "1"); + private final String msgWithMDC = "Lost executor {}."; + + private final MDC[] mdcs = new MDC[] { + MDC.of(LogKeys.EXECUTOR_ID$.MODULE$, "1"), + MDC.of(LogKeys.REASON$.MODULE$, "the shuffle data is too large")}; + private final String msgWithMDCs = "Lost executor {}, reason: {}"; + + private final MDC executorIDMDCValueIsNull = MDC.of(LogKeys.EXECUTOR_ID$.MODULE$, null); + + private final MDC externalSystemCustomLog = + MDC.of(CustomLogKeys.CUSTOM_LOG_KEY$.MODULE$, "External system custom log message."); + + // test for basic message (without any mdc) + abstract String expectedPatternForBasicMsg(Level level); + + // test for basic message and exception + abstract String expectedPatternForBasicMsgWithException(Level level); + + // test for message (with mdc) + abstract String expectedPatternForMsgWithMDC(Level level); + + // test for message (with mdcs) + abstract String expectedPatternForMsgWithMDCs(Level level); + + // test for message (with mdcs and exception) + abstract String expectedPatternForMsgWithMDCsAndException(Level level); + + // test for message (with mdc - the value is null) + abstract String expectedPatternForMsgWithMDCValueIsNull(Level level); + + // test for external system custom LogKey + abstract String expectedPatternForExternalSystemCustomLogKey(Level level); + + @Test + public void testBasicMsgLogger() { + Runnable errorFn = () -> logger().error(basicMsg()); + Runnable warnFn = () -> logger().warn(basicMsg()); + Runnable infoFn = () -> logger().info(basicMsg()); + Runnable debugFn = () -> logger().debug(basicMsg()); + Runnable traceFn = () -> logger().trace(basicMsg()); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForBasicMsg(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testBasicLoggerWithException() { + Throwable exception = new RuntimeException("OOM"); + Runnable errorFn = () -> logger().error(basicMsg(), exception); + Runnable warnFn = () -> logger().warn(basicMsg(), exception); + Runnable infoFn = () -> logger().info(basicMsg(), exception); + Runnable debugFn = () -> logger().debug(basicMsg(), exception); + Runnable traceFn = () -> logger().trace(basicMsg(), exception); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForBasicMsgWithException(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testLoggerWithMDC() { + Runnable errorFn = () -> logger().error(msgWithMDC, executorIDMDC); + Runnable warnFn = () -> logger().warn(msgWithMDC, executorIDMDC); + Runnable infoFn = () -> logger().info(msgWithMDC, executorIDMDC); + Runnable debugFn = () -> logger().debug(msgWithMDC, executorIDMDC); + Runnable traceFn = () -> logger().trace(msgWithMDC, executorIDMDC); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForMsgWithMDC(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testLoggerWithMDCs() { + Runnable errorFn = () -> logger().error(msgWithMDCs, mdcs); + Runnable warnFn = () -> logger().warn(msgWithMDCs, mdcs); + Runnable infoFn = () -> logger().info(msgWithMDCs, mdcs); + Runnable debugFn = () -> logger().debug(msgWithMDCs, mdcs); + Runnable traceFn = () -> logger().trace(msgWithMDCs, mdcs); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForMsgWithMDCs(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testLoggerWithMDCsAndException() { + Throwable exception = new RuntimeException("OOM"); + Runnable errorFn = () -> logger().error(msgWithMDCs, exception, mdcs); + Runnable warnFn = () -> logger().warn(msgWithMDCs, exception, mdcs); + Runnable infoFn = () -> logger().info(msgWithMDCs, exception, mdcs); + Runnable debugFn = () -> logger().debug(msgWithMDCs, exception, mdcs); + Runnable traceFn = () -> logger().trace(msgWithMDCs, exception, mdcs); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForMsgWithMDCsAndException(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testLoggerWithMDCValueIsNull() { + Runnable errorFn = () -> logger().error(msgWithMDC, executorIDMDCValueIsNull); + Runnable warnFn = () -> logger().warn(msgWithMDC, executorIDMDCValueIsNull); + Runnable infoFn = () -> logger().info(msgWithMDC, executorIDMDCValueIsNull); + Runnable debugFn = () -> logger().debug(msgWithMDC, executorIDMDCValueIsNull); + Runnable traceFn = () -> logger().trace(msgWithMDC, executorIDMDCValueIsNull); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForMsgWithMDCValueIsNull(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Test + public void testLoggerWithExternalSystemCustomLogKey() { + Runnable errorFn = () -> logger().error("{}", externalSystemCustomLog); + Runnable warnFn = () -> logger().warn("{}", externalSystemCustomLog); + Runnable infoFn = () -> logger().info("{}", externalSystemCustomLog); + Runnable debugFn = () -> logger().debug("{}", externalSystemCustomLog); + Runnable traceFn = () -> logger().trace("{}", externalSystemCustomLog); + List.of( + Pair.of(Level.ERROR, errorFn), + Pair.of(Level.WARN, warnFn), + Pair.of(Level.INFO, infoFn), + Pair.of(Level.DEBUG, debugFn), + Pair.of(Level.TRACE, traceFn)).forEach(pair -> { + try { + assert (captureLogOutput(pair.getRight()).matches( + expectedPatternForExternalSystemCustomLogKey(pair.getLeft()))); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/common/utils/src/test/java/org/apache/spark/util/PatternLoggerSuite.java b/common/utils/src/test/java/org/apache/spark/util/PatternLoggerSuite.java new file mode 100644 index 000000000000..13b6a1d05470 --- /dev/null +++ b/common/utils/src/test/java/org/apache/spark/util/PatternLoggerSuite.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util; + +import org.apache.logging.log4j.Level; + +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; + +public class PatternLoggerSuite extends LoggerSuiteBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(PatternLoggerSuite.class); + + private String toRegexPattern(Level level, String msg) { + return msg + .replace("<level>", level.toString()) + .replace("<className>", className()); + } + + @Override + Logger logger() { + return LOGGER; + } + + @Override + String className() { + return PatternLoggerSuite.class.getSimpleName(); + } + + @Override + String logFilePath() { + return "target/pattern.log"; + } + + @Override + String expectedPatternForBasicMsg(Level level) { + return toRegexPattern(level, ".*<level> <className>: This is a log message\n"); + } + + @Override + String expectedPatternForBasicMsgWithException(Level level) { + return toRegexPattern(level, """ + .*<level> <className>: This is a log message + [\\s\\S]*"""); + } + + @Override + String expectedPatternForMsgWithMDC(Level level) { + return toRegexPattern(level, ".*<level> <className>: Lost executor 1.\n"); + } + + @Override + String expectedPatternForMsgWithMDCs(Level level) { + return toRegexPattern(level, + ".*<level> <className>: Lost executor 1, reason: the shuffle data is too large\n"); + } + + @Override + String expectedPatternForMsgWithMDCsAndException(Level level) { + return toRegexPattern(level,""" + .*<level> <className>: Lost executor 1, reason: the shuffle data is too large + [\\s\\S]*"""); + } + + @Override + String expectedPatternForMsgWithMDCValueIsNull(Level level) { + return toRegexPattern(level, ".*<level> <className>: Lost executor null.\n"); + } + + @Override + String expectedPatternForExternalSystemCustomLogKey(Level level) { + return toRegexPattern(level, ".*<level> <className>: External system custom log message.\n"); + } +} diff --git a/common/utils/src/test/java/org/apache/spark/util/StructuredLoggerSuite.java b/common/utils/src/test/java/org/apache/spark/util/StructuredLoggerSuite.java new file mode 100644 index 000000000000..c1b31bf68a7d --- /dev/null +++ b/common/utils/src/test/java/org/apache/spark/util/StructuredLoggerSuite.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.logging.log4j.Level; + +import org.apache.spark.internal.Logger; +import org.apache.spark.internal.LoggerFactory; + +public class StructuredLoggerSuite extends LoggerSuiteBase { + + private static final Logger LOGGER = LoggerFactory.getLogger(StructuredLoggerSuite.class); + + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + private String compactAndToRegexPattern(Level level, String json) { + try { + return JSON_MAPPER.readTree(json).toString() + .replace("<level>", level.toString()) + .replace("<className>", className()) + .replace("<timestamp>", "[^\"]+") + .replace("\"<stacktrace>\"", ".*") + .replace("{", "\\{") + "\n"; + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + Logger logger() { + return LOGGER; + } + + @Override + String className() { + return StructuredLoggerSuite.class.getSimpleName(); + } + + @Override + String logFilePath() { + return "target/structured.log"; + } + + @Override + String expectedPatternForBasicMsg(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "This is a log message", + "logger": "<className>" + }"""); + } + + @Override + String expectedPatternForBasicMsgWithException(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "This is a log message", + "exception": { + "class": "java.lang.RuntimeException", + "msg": "OOM", + "stacktrace": "<stacktrace>" + }, + "logger": "<className>" + }"""); + } + + @Override + String expectedPatternForMsgWithMDC(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "Lost executor 1.", + "context": { + "executor_id": "1" + }, + "logger": "<className>" + }"""); + } + + @Override + String expectedPatternForMsgWithMDCs(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "Lost executor 1, reason: the shuffle data is too large", + "context": { + "executor_id": "1", + "reason": "the shuffle data is too large" + }, + "logger": "<className>" + }"""); + } + + @Override + String expectedPatternForMsgWithMDCsAndException(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "Lost executor 1, reason: the shuffle data is too large", + "context": { + "executor_id": "1", + "reason": "the shuffle data is too large" + }, + "exception": { + "class": "java.lang.RuntimeException", + "msg": "OOM", + "stacktrace": "<stacktrace>" + }, + "logger": "<className>" + }"""); + } + + @Override + String expectedPatternForMsgWithMDCValueIsNull(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "Lost executor null.", + "context": { + "executor_id": null + }, + "logger": "<className>" + }"""); + } + + @Override + String expectedPatternForExternalSystemCustomLogKey(Level level) { + return compactAndToRegexPattern(level, """ + { + "ts": "<timestamp>", + "level": "<level>", + "msg": "External system custom log message.", + "context": { + "custom_log_key": "External system custom log message." + }, + "logger": "<className>" + }"""); + } +} + diff --git a/common/utils/src/test/resources/log4j2.properties b/common/utils/src/test/resources/log4j2.properties index e3bd8689993d..3e6dc42d3d5d 100644 --- a/common/utils/src/test/resources/log4j2.properties +++ b/common/utils/src/test/resources/log4j2.properties @@ -39,12 +39,22 @@ appender.pattern.layout.type = PatternLayout appender.pattern.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex # Custom loggers -logger.structured.name = org.apache.spark.util.StructuredLoggingSuite -logger.structured.level = trace -logger.structured.appenderRefs = structured -logger.structured.appenderRef.structured.ref = structured - -logger.pattern.name = org.apache.spark.util.PatternLoggingSuite -logger.pattern.level = trace -logger.pattern.appenderRefs = pattern -logger.pattern.appenderRef.pattern.ref = pattern +logger.structured_logging.name = org.apache.spark.util.StructuredLoggingSuite +logger.structured_logging.level = trace +logger.structured_logging.appenderRefs = structured +logger.structured_logging.appenderRef.structured.ref = structured + +logger.pattern_logging.name = org.apache.spark.util.PatternLoggingSuite +logger.pattern_logging.level = trace +logger.pattern_logging.appenderRefs = pattern +logger.pattern_logging.appenderRef.pattern.ref = pattern + +logger.structured_logger.name = org.apache.spark.util.StructuredLoggerSuite +logger.structured_logger.level = trace +logger.structured_logger.appenderRefs = structured +logger.structured_logger.appenderRef.structured.ref = structured + +logger.pattern_logger.name = org.apache.spark.util.PatternLoggerSuite +logger.pattern_logger.level = trace +logger.pattern_logger.appenderRefs = pattern +logger.pattern_logger.appenderRef.pattern.ref = pattern diff --git a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala index 33a701aaed0f..2152b57524d7 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/StructuredLoggingSuite.scala @@ -147,7 +147,7 @@ trait LoggingSuiteBase } private val externalSystemCustomLog = - log"${MDC(CUSTOM_LOG_KEY, "External system custom log message.")}" + log"${MDC(CustomLogKeys.CUSTOM_LOG_KEY, "External system custom log message.")}" test("Logging with external system custom LogKey") { Seq( (Level.ERROR, () => logError(externalSystemCustomLog)), @@ -306,5 +306,7 @@ class StructuredLoggingSuite extends LoggingSuiteBase { } } -// External system custom LogKey must be `extends LogKey` -case object CUSTOM_LOG_KEY extends LogKey +object CustomLogKeys { + // External system custom LogKey must be `extends LogKey` + case object CUSTOM_LOG_KEY extends LogKey +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org