This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7da9457b36c KAFKA-19260 Move LoggingController to server module (#19687) 7da9457b36c is described below commit 7da9457b36cf9ecccc55746163e74f01feb68418 Author: Yunchi Pang <yunchip...@gmail.com> AuthorDate: Fri May 23 08:39:33 2025 -0700 KAFKA-19260 Move LoggingController to server module (#19687) Move `LoggingController` to server module and rewrite it in java. Reviewers: PoAn Yang <pay...@apache.org>, Ken Huang <s7133...@gmail.com>, Ken Huang <s7133...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- build.gradle | 4 + checkstyle/import-control-server.xml | 3 + .../kafka/server/logger/RuntimeLoggerManager.java | 7 +- .../src/main/scala/kafka/server/ConfigHelper.scala | 5 +- core/src/main/scala/kafka/utils/Logging.scala | 2 +- .../main/scala/kafka/utils/LoggingController.scala | 174 --------------------- .../server/logger/RuntimeLoggerManagerTest.java | 9 +- .../kafka/api/PlaintextAdminIntegrationTest.scala | 3 +- core/src/test/scala/kafka/utils/LoggingTest.scala | 12 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 3 +- .../kafka/server/logger/Log4jCoreController.java | 92 +++++++++++ .../kafka/server/logger/LoggingController.java | 103 ++++++++++++ .../server/logger/LoggingControllerDelegate.java | 28 ++++ .../server/logger/LoggingControllerMBean.java | 26 +++ .../apache/kafka/server/logger/NoOpController.java | 37 +++++ 15 files changed, 310 insertions(+), 198 deletions(-) diff --git a/build.gradle b/build.gradle index 36ced29d0bd..9b756b736f2 100644 --- a/build.gradle +++ b/build.gradle @@ -890,6 +890,9 @@ project(':server') { } dependencies { + compileOnly libs.bndlib + compileOnly libs.spotbugs + implementation project(':clients') implementation project(':metadata') implementation project(':server-common') @@ -902,6 +905,7 @@ project(':server') { implementation libs.jacksonDatabind implementation libs.metrics implementation libs.slf4jApi + implementation log4j2Libs testImplementation project(':clients').sourceSets.test.output diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index 2f96777351a..7fc1a273c6a 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -88,6 +88,9 @@ <allow pkg="org.apache.kafka.network.metrics" /> <allow pkg="org.apache.kafka.storage.internals.log" /> <allow pkg="org.apache.kafka.storage.internals.checkpoint" /> + <allow pkg="org.apache.logging.log4j" /> + <allow pkg="org.apache.logging.log4j.core" /> + <allow pkg="org.apache.logging.log4j.core.config" /> <subpackage name="metrics"> <allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" /> <allow pkg="org.apache.kafka.server.telemetry" /> diff --git a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java index e3fa44ff35c..3cb226e0686 100644 --- a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java +++ b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java @@ -17,8 +17,6 @@ package kafka.server.logger; -import kafka.utils.LoggingController; - import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.config.LogLevelConfig; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -27,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.logger.LoggingController; import org.slf4j.Logger; @@ -131,9 +130,9 @@ public class RuntimeLoggerManager { break; case DELETE: validateLoggerNameExists(loggerName); - if (loggerName.equals(LoggingController.ROOT_LOGGER())) { + if (loggerName.equals(LoggingController.ROOT_LOGGER)) { throw new InvalidRequestException("Removing the log level of the " + - LoggingController.ROOT_LOGGER() + " logger is not allowed"); + LoggingController.ROOT_LOGGER + " logger is not allowed"); } break; case APPEND: diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala b/core/src/main/scala/kafka/server/ConfigHelper.scala index 453ee0dc972..9a992a55c74 100644 --- a/core/src/main/scala/kafka/server/ConfigHelper.scala +++ b/core/src/main/scala/kafka/server/ConfigHelper.scala @@ -20,7 +20,7 @@ package kafka.server import kafka.network.RequestChannel import java.util.{Collections, Properties} -import kafka.utils.{LoggingController, Logging} +import kafka.utils.Logging import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidRequestException} @@ -35,6 +35,7 @@ import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC} import org.apache.kafka.coordinator.group.GroupConfig import org.apache.kafka.metadata.{ConfigRepository, MetadataCache} import org.apache.kafka.server.config.ServerTopicConfigSynonyms +import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.LogConfig @@ -130,7 +131,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepo else if (resourceNameToBrokerId(resource.resourceName) != config.brokerId) throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.resourceName}") else - createResponseConfig(LoggingController.loggers, + createResponseConfig(LoggingController.loggers.asScala, (name, value) => new DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name) .setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id) .setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava)) diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala index 7518fecd2f9..e08a6873fc1 100755 --- a/core/src/main/scala/kafka/utils/Logging.scala +++ b/core/src/main/scala/kafka/utils/Logging.scala @@ -18,9 +18,9 @@ package kafka.utils import com.typesafe.scalalogging.Logger +import org.apache.kafka.server.logger.LoggingController import org.slf4j.{LoggerFactory, Marker, MarkerFactory} - object Log4jControllerRegistration { private val logger = Logger(this.getClass.getName) diff --git a/core/src/main/scala/kafka/utils/LoggingController.scala b/core/src/main/scala/kafka/utils/LoggingController.scala deleted file mode 100755 index 9d8de03a6c5..00000000000 --- a/core/src/main/scala/kafka/utils/LoggingController.scala +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import com.typesafe.scalalogging.Logger -import kafka.utils.LoggingController.ROOT_LOGGER -import org.apache.kafka.common.utils.Utils -import org.apache.logging.log4j.core.LoggerContext -import org.apache.logging.log4j.core.config.Configurator -import org.apache.logging.log4j.{Level, LogManager} - -import java.util -import java.util.Locale -import scala.jdk.CollectionConverters._ - - -object LoggingController { - - private val logger = Logger[LoggingController] - - /** - * Note: In Log4j 1, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature. - * - * The root logger's name is changed in log4j2 to empty string (see: [[LogManager.ROOT_LOGGER_NAME]]) but for backward- - * compatibility. Kafka keeps its original root logger name. It is why here is a dedicated definition for the root logger name. - */ - val ROOT_LOGGER = "root" - - private[this] val delegate: LoggingControllerDelegate = { - try { - new Log4jCoreController - } catch { - case _: ClassCastException | _: LinkageError => - logger.info("No supported logging implementation found. Logging configuration endpoint will be disabled.") - new NoOpController - case e: Exception => - logger.warn("A problem occurred, while initializing the logging controller. Logging configuration endpoint will be disabled.", e) - new NoOpController - } - } - - /** - * Returns a map of the log4j loggers and their assigned log level. - * If a logger does not have a log level assigned, we return the log level of the first ancestor with a level configured. - */ - def loggers: Map[String, String] = delegate.loggers - - /** - * Sets the log level of a particular logger. If the given logLevel is not an available level - * (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls back to DEBUG. - * - * @see [[Level.toLevel]] - */ - def logLevel(loggerName: String, logLevel: String): Boolean = delegate.logLevel(loggerName, logLevel) - - def unsetLogLevel(loggerName: String): Boolean = delegate.unsetLogLevel(loggerName) - - def loggerExists(loggerName: String): Boolean = delegate.loggerExists(loggerName) -} - -private class NoOpController extends LoggingControllerDelegate { - override def loggers: Map[String, String] = Map.empty - - override def logLevel(loggerName: String, logLevel: String): Boolean = false - - override def unsetLogLevel(loggerName: String): Boolean = false -} - -private class Log4jCoreController extends LoggingControllerDelegate { - private[this] val logContext = LogManager.getContext(false).asInstanceOf[LoggerContext] - - override def loggers: Map[String, String] = { - val rootLoggerLevel = logContext.getRootLogger.getLevel.toString - - // Loggers defined in the configuration - val configured = logContext.getConfiguration.getLoggers.asScala - .values - .filterNot(_.getName.equals(LogManager.ROOT_LOGGER_NAME)) - .map { logger => - logger.getName -> logger.getLevel.toString - }.toMap - - // Loggers actually running - val actual = logContext.getLoggers.asScala - .filterNot(_.getName.equals(LogManager.ROOT_LOGGER_NAME)) - .map { logger => - logger.getName -> logger.getLevel.toString - }.toMap - - (configured ++ actual) + (ROOT_LOGGER -> rootLoggerLevel) - } - - override def logLevel(loggerName: String, logLevel: String): Boolean = { - if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel)) - return false - - val level = Level.toLevel(logLevel.toUpperCase(Locale.ROOT)) - - if (loggerName == ROOT_LOGGER) { - Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, level) - true - } else { - if (loggerExists(loggerName) && level != null) { - Configurator.setLevel(loggerName, level) - true - } - else false - } - } - - override def unsetLogLevel(loggerName: String): Boolean = { - val nullLevel: Level = null - if (loggerName == ROOT_LOGGER) { - Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, nullLevel) - true - } else { - if (loggerExists(loggerName)) { - Configurator.setLevel(loggerName, nullLevel) - true - } - else false - } - } -} - -private abstract class LoggingControllerDelegate { - def loggers: Map[String, String] - def logLevel(loggerName: String, logLevel: String): Boolean - def unsetLogLevel(loggerName: String): Boolean - def loggerExists(loggerName: String): Boolean = loggers.contains(loggerName) -} - -/** - * An MBean that allows the user to dynamically alter log4j levels at runtime. - * The companion object contains the singleton instance of this class and - * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization - * of the companion object. - */ -class LoggingController extends LoggingControllerMBean { - - def getLoggers: util.List[String] = { - // we replace scala collection by java collection so mbean client is able to deserialize it without scala library. - new util.ArrayList[String](LoggingController.loggers.map { - case (logger, level) => s"$logger=$level" - }.toSeq.asJava) - } - - def getLogLevel(loggerName: String): String = { - LoggingController.loggers.getOrElse(loggerName, "No such logger.") - } - - def setLogLevel(loggerName: String, level: String): Boolean = LoggingController.logLevel(loggerName, level) -} - -trait LoggingControllerMBean { - def getLoggers: java.util.List[String] - def getLogLevel(logger: String): String - def setLogLevel(logger: String, level: String): Boolean -} diff --git a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java index b700e162929..b5c8740639c 100644 --- a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java +++ b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java @@ -16,13 +16,12 @@ */ package kafka.server.logger; -import kafka.utils.LoggingController; - import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; +import org.apache.kafka.server.logger.LoggingController; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -82,18 +81,18 @@ public class RuntimeLoggerManagerTest { @Test public void testValidateSetRootLogLevelConfig() { MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig(). - setName(LoggingController.ROOT_LOGGER()). + setName(LoggingController.ROOT_LOGGER). setConfigOperation(OpType.SET.id()). setValue("TRACE"))); } @Test public void testValidateRemoveRootLogLevelConfigNotAllowed() { - assertEquals("Removing the log level of the " + LoggingController.ROOT_LOGGER() + + assertEquals("Removing the log level of the " + LoggingController.ROOT_LOGGER + " logger is not allowed", Assertions.assertThrows(InvalidRequestException.class, () -> MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig(). - setName(LoggingController.ROOT_LOGGER()). + setName(LoggingController.ROOT_LOGGER). setConfigOperation(OpType.DELETE.id()). setValue("")))).getMessage()); } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 1fbc6523a92..11a334bf6e7 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -29,7 +29,7 @@ import java.{time, util} import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils._ -import kafka.utils.{LoggingController, TestInfoUtils, TestUtils} +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.HostResolver import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource @@ -53,6 +53,7 @@ import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala b/core/src/test/scala/kafka/utils/LoggingTest.scala index 7479f021649..761b276c400 100644 --- a/core/src/test/scala/kafka/utils/LoggingTest.scala +++ b/core/src/test/scala/kafka/utils/LoggingTest.scala @@ -17,6 +17,7 @@ package kafka.utils +import org.apache.kafka.server.logger.LoggingController import java.lang.management.ManagementFactory import javax.management.ObjectName @@ -24,17 +25,8 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.slf4j.LoggerFactory - class LoggingTest extends Logging { - @Test - def testTypeOfGetLoggers(): Unit = { - val log4jController = new LoggingController - // the return object of getLoggers must be a collection instance from java standard library. - // That enables mbean client to deserialize it without extra libraries. - assertEquals(classOf[java.util.ArrayList[String]], log4jController.getLoggers.getClass) - } - @Test def testLog4jControllerIsRegistered(): Unit = { val mbs = ManagementFactory.getPlatformMBeanServer @@ -42,7 +34,7 @@ class LoggingTest extends Logging { val log4jControllerName = ObjectName.getInstance("kafka:type=kafka.Log4jController") assertTrue(mbs.isRegistered(log4jControllerName), "kafka.utils.Log4jController is not registered") val log4jInstance = mbs.getObjectInstance(log4jControllerName) - assertEquals("kafka.utils.LoggingController", log4jInstance.getClassName) + assertEquals("org.apache.kafka.server.logger.LoggingController", log4jInstance.getClassName) } @Test diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 3ca2b111c08..5155b9f3aed 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -23,7 +23,7 @@ import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers import kafka.server.metadata.KRaftMetadataCache import kafka.server.share.SharePartitionManager -import kafka.utils.{CoreUtils, Logging, LoggingController, TestUtils} +import kafka.utils.{CoreUtils, Logging, TestUtils} import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common._ @@ -90,6 +90,7 @@ import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager} import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer} import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, StreamsVersion, TransactionVersion} import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.logger.LoggingController import org.apache.kafka.server.metrics.ClientMetricsTestUtils import org.apache.kafka.server.share.{CachedSharePartition, ErroneousAndValidPartitionData, SharePartitionKey} import org.apache.kafka.server.quota.ThrottleCallback diff --git a/server/src/main/java/org/apache/kafka/server/logger/Log4jCoreController.java b/server/src/main/java/org/apache/kafka/server/logger/Log4jCoreController.java new file mode 100644 index 00000000000..e8c9ca28819 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/logger/Log4jCoreController.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.logger; + +import org.apache.kafka.common.utils.Utils; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.LoggerContext; +import org.apache.logging.log4j.core.config.Configurator; +import org.apache.logging.log4j.core.config.LoggerConfig; + +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +class Log4jCoreController implements LoggingControllerDelegate { + private final LoggerContext logContext; + + public Log4jCoreController() { + this.logContext = (LoggerContext) LogManager.getContext(false); + } + + @Override + public Map<String, String> loggers() { + String rootLoggerLevel = logContext.getRootLogger().getLevel().toString(); + + Map<String, String> result = new HashMap<>(); + // Loggers defined in the configuration + for (LoggerConfig logger : logContext.getConfiguration().getLoggers().values()) { + if (!logger.getName().equals(LogManager.ROOT_LOGGER_NAME)) { + result.put(logger.getName(), logger.getLevel().toString()); + } + } + // Loggers actually running + for (Logger logger : logContext.getLoggers()) { + if (!logger.getName().equals(LogManager.ROOT_LOGGER_NAME)) { + result.put(logger.getName(), logger.getLevel().toString()); + } + } + // Add root logger + result.put(LoggingController.ROOT_LOGGER, rootLoggerLevel); + return result; + } + + @Override + public boolean logLevel(String loggerName, String logLevel) { + if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel)) + return false; + + Level level = Level.toLevel(logLevel.toUpperCase(Locale.ROOT)); + + if (loggerName.equals(LoggingController.ROOT_LOGGER)) { + Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, level); + return true; + } + if (loggerExists(loggerName) && level != null) { + Configurator.setLevel(loggerName, level); + return true; + } + return false; + } + + @Override + public boolean unsetLogLevel(String loggerName) { + Level nullLevel = null; + if (loggerName.equals(LoggingController.ROOT_LOGGER)) { + Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, nullLevel); + return true; + } + if (loggerExists(loggerName)) { + Configurator.setLevel(loggerName, nullLevel); + return true; + } + return false; + } +} diff --git a/server/src/main/java/org/apache/kafka/server/logger/LoggingController.java b/server/src/main/java/org/apache/kafka/server/logger/LoggingController.java new file mode 100644 index 00000000000..fba75b10c07 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/logger/LoggingController.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.logger; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Map; + +/** + * An MBean that allows the user to dynamically alter log4j levels at runtime. + * The companion object contains the singleton instance of this class and + * registers the MBean. The {@code kafka.utils.Logging} trait forces initialization + * of the companion object. + */ +public class LoggingController implements LoggingControllerMBean { + + private static final Logger LOGGER = LogManager.getLogger(LoggingController.class); + + /** + * Note: In Log4j 1, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature. + * The root logger's name is changed in log4j2 to empty string (see: {@link LogManager#ROOT_LOGGER_NAME}) but for backward-compatibility. + * Kafka keeps its original root logger name. It is why here is a dedicated definition for the root logger name. + */ + public static final String ROOT_LOGGER = "root"; + + private static final LoggingControllerDelegate DELEGATE; + + static { + LoggingControllerDelegate tempDelegate; + try { + tempDelegate = new Log4jCoreController(); + } catch (ClassCastException | LinkageError e) { + LOGGER.info("No supported logging implementation found. Logging configuration endpoint will be disabled."); + tempDelegate = new NoOpController(); + } catch (Exception e) { + LOGGER.warn("A problem occurred, while initializing the logging controller. Logging configuration endpoint will be disabled.", e); + tempDelegate = new NoOpController(); + } + DELEGATE = tempDelegate; + } + + /** + * Returns a map of the log4j loggers and their assigned log level. + * If a logger does not have a log level assigned, we return the log level of the first ancestor with a level configured. + */ + public static Map<String, String> loggers() { + return DELEGATE.loggers(); + } + + /** + * Sets the log level of a particular logger. If the given logLevel is not an available level + * (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls back to DEBUG. + * + * @see Level#toLevel(String, Level) + */ + public static boolean logLevel(String loggerName, String logLevel) { + return DELEGATE.logLevel(loggerName, logLevel); + } + + public static boolean unsetLogLevel(String loggerName) { + return DELEGATE.unsetLogLevel(loggerName); + } + + public static boolean loggerExists(String loggerName) { + return DELEGATE.loggerExists(loggerName); + } + + @Override + public List<String> getLoggers() { + return LoggingController.loggers() + .entrySet() + .stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .toList(); + } + + @Override + public String getLogLevel(String loggerName) { + return LoggingController.loggers().getOrDefault(loggerName, "No such logger."); + } + + @Override + public boolean setLogLevel(String loggerName, String level) { + return LoggingController.logLevel(loggerName, level); + } +} diff --git a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerDelegate.java b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerDelegate.java new file mode 100644 index 00000000000..0d73b3aca25 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerDelegate.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.logger; + +import java.util.Map; + +public interface LoggingControllerDelegate { + Map<String, String> loggers(); + boolean logLevel(String loggerName, String logLevel); + boolean unsetLogLevel(String loggerName); + default boolean loggerExists(String loggerName) { + return loggers().containsKey(loggerName); + } +} diff --git a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java new file mode 100644 index 00000000000..c5e47f2aca4 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.logger; + +import java.util.List; + + +public interface LoggingControllerMBean { + List<String> getLoggers(); + String getLogLevel(String logger); + boolean setLogLevel(String logger, String level); +} \ No newline at end of file diff --git a/server/src/main/java/org/apache/kafka/server/logger/NoOpController.java b/server/src/main/java/org/apache/kafka/server/logger/NoOpController.java new file mode 100644 index 00000000000..b8cf2aa951d --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/logger/NoOpController.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.logger; + +import java.util.Map; + +class NoOpController implements LoggingControllerDelegate { + + @Override + public Map<String, String> loggers() { + return Map.of(); + } + + @Override + public boolean logLevel(String loggerName, String logLevel) { + return false; + } + + @Override + public boolean unsetLogLevel(String loggerName) { + return false; + } +}