This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7da9457b36c KAFKA-19260 Move LoggingController to server module 
(#19687)
7da9457b36c is described below

commit 7da9457b36cf9ecccc55746163e74f01feb68418
Author: Yunchi Pang <yunchip...@gmail.com>
AuthorDate: Fri May 23 08:39:33 2025 -0700

    KAFKA-19260 Move LoggingController to server module (#19687)
    
    Move `LoggingController` to server module and rewrite it in java.
    
    Reviewers: PoAn Yang <pay...@apache.org>, Ken Huang
     <s7133...@gmail.com>, Ken Huang <s7133...@gmail.com>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 build.gradle                                       |   4 +
 checkstyle/import-control-server.xml               |   3 +
 .../kafka/server/logger/RuntimeLoggerManager.java  |   7 +-
 .../src/main/scala/kafka/server/ConfigHelper.scala |   5 +-
 core/src/main/scala/kafka/utils/Logging.scala      |   2 +-
 .../main/scala/kafka/utils/LoggingController.scala | 174 ---------------------
 .../server/logger/RuntimeLoggerManagerTest.java    |   9 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   3 +-
 core/src/test/scala/kafka/utils/LoggingTest.scala  |  12 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../kafka/server/logger/Log4jCoreController.java   |  92 +++++++++++
 .../kafka/server/logger/LoggingController.java     | 103 ++++++++++++
 .../server/logger/LoggingControllerDelegate.java   |  28 ++++
 .../server/logger/LoggingControllerMBean.java      |  26 +++
 .../apache/kafka/server/logger/NoOpController.java |  37 +++++
 15 files changed, 310 insertions(+), 198 deletions(-)

diff --git a/build.gradle b/build.gradle
index 36ced29d0bd..9b756b736f2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -890,6 +890,9 @@ project(':server') {
   }
 
   dependencies {
+    compileOnly libs.bndlib
+    compileOnly libs.spotbugs
+
     implementation project(':clients')
     implementation project(':metadata')
     implementation project(':server-common')
@@ -902,6 +905,7 @@ project(':server') {
     implementation libs.jacksonDatabind
     implementation libs.metrics
     implementation libs.slf4jApi
+    implementation log4j2Libs
 
     testImplementation project(':clients').sourceSets.test.output
 
diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index 2f96777351a..7fc1a273c6a 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -88,6 +88,9 @@
     <allow pkg="org.apache.kafka.network.metrics" />
     <allow pkg="org.apache.kafka.storage.internals.log" />
     <allow pkg="org.apache.kafka.storage.internals.checkpoint" />
+    <allow pkg="org.apache.logging.log4j" />
+    <allow pkg="org.apache.logging.log4j.core" />
+    <allow pkg="org.apache.logging.log4j.core.config" />
     <subpackage name="metrics">
       <allow 
class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
       <allow pkg="org.apache.kafka.server.telemetry" />
diff --git a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java 
b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java
index e3fa44ff35c..3cb226e0686 100644
--- a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java
+++ b/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java
@@ -17,8 +17,6 @@
 
 package kafka.server.logger;
 
-import kafka.utils.LoggingController;
-
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.common.config.LogLevelConfig;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
@@ -27,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.logger.LoggingController;
 
 import org.slf4j.Logger;
 
@@ -131,9 +130,9 @@ public class RuntimeLoggerManager {
                     break;
                 case DELETE:
                     validateLoggerNameExists(loggerName);
-                    if (loggerName.equals(LoggingController.ROOT_LOGGER())) {
+                    if (loggerName.equals(LoggingController.ROOT_LOGGER)) {
                         throw new InvalidRequestException("Removing the log 
level of the " +
-                            LoggingController.ROOT_LOGGER() + " logger is not 
allowed");
+                            LoggingController.ROOT_LOGGER + " logger is not 
allowed");
                     }
                     break;
                 case APPEND:
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala 
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 453ee0dc972..9a992a55c74 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -20,7 +20,7 @@ package kafka.server
 import kafka.network.RequestChannel
 
 import java.util.{Collections, Properties}
-import kafka.utils.{LoggingController, Logging}
+import kafka.utils.Logging
 import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, InvalidRequestException}
@@ -35,6 +35,7 @@ import 
org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
 import org.apache.kafka.coordinator.group.GroupConfig
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
+import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.server.metrics.ClientMetricsConfigs
 import org.apache.kafka.storage.internals.log.LogConfig
 
@@ -130,7 +131,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
             else if (resourceNameToBrokerId(resource.resourceName) != 
config.brokerId)
               throw new InvalidRequestException(s"Unexpected broker id, 
expected ${config.brokerId} but received ${resource.resourceName}")
             else
-              createResponseConfig(LoggingController.loggers,
+              createResponseConfig(LoggingController.loggers.asScala,
                 (name, value) => new 
DescribeConfigsResponseData.DescribeConfigsResourceResult().setName(name)
                   
.setValue(value.toString).setConfigSource(ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG.id)
                   
.setIsSensitive(false).setReadOnly(false).setSynonyms(List.empty.asJava))
diff --git a/core/src/main/scala/kafka/utils/Logging.scala 
b/core/src/main/scala/kafka/utils/Logging.scala
index 7518fecd2f9..e08a6873fc1 100755
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -18,9 +18,9 @@
 package kafka.utils
 
 import com.typesafe.scalalogging.Logger
+import org.apache.kafka.server.logger.LoggingController
 import org.slf4j.{LoggerFactory, Marker, MarkerFactory}
 
-
 object Log4jControllerRegistration {
   private val logger = Logger(this.getClass.getName)
 
diff --git a/core/src/main/scala/kafka/utils/LoggingController.scala 
b/core/src/main/scala/kafka/utils/LoggingController.scala
deleted file mode 100755
index 9d8de03a6c5..00000000000
--- a/core/src/main/scala/kafka/utils/LoggingController.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import com.typesafe.scalalogging.Logger
-import kafka.utils.LoggingController.ROOT_LOGGER
-import org.apache.kafka.common.utils.Utils
-import org.apache.logging.log4j.core.LoggerContext
-import org.apache.logging.log4j.core.config.Configurator
-import org.apache.logging.log4j.{Level, LogManager}
-
-import java.util
-import java.util.Locale
-import scala.jdk.CollectionConverters._
-
-
-object LoggingController {
-
-  private val logger = Logger[LoggingController]
-
-  /**
-   * Note: In Log4j 1, the root logger's name was "root" and Kafka also 
followed that name for dynamic logging control feature.
-   *
-   * The root logger's name is changed in log4j2 to empty string (see: 
[[LogManager.ROOT_LOGGER_NAME]]) but for backward-
-   * compatibility. Kafka keeps its original root logger name. It is why here 
is a dedicated definition for the root logger name.
-   */
-  val ROOT_LOGGER = "root"
-
-  private[this] val delegate: LoggingControllerDelegate = {
-    try {
-      new Log4jCoreController
-    } catch {
-      case _: ClassCastException | _: LinkageError =>
-        logger.info("No supported logging implementation found. Logging 
configuration endpoint will be disabled.")
-        new NoOpController
-      case e: Exception =>
-        logger.warn("A problem occurred, while initializing the logging 
controller. Logging configuration endpoint will be disabled.", e)
-        new NoOpController
-    }
-  }
-
-  /**
-   * Returns a map of the log4j loggers and their assigned log level.
-   * If a logger does not have a log level assigned, we return the log level 
of the first ancestor with a level configured.
-   */
-  def loggers: Map[String, String] = delegate.loggers
-
-  /**
-   * Sets the log level of a particular logger. If the given logLevel is not 
an available level
-   * (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it falls 
back to DEBUG.
-   *
-   * @see [[Level.toLevel]]
-   */
-  def logLevel(loggerName: String, logLevel: String): Boolean = 
delegate.logLevel(loggerName, logLevel)
-
-  def unsetLogLevel(loggerName: String): Boolean = 
delegate.unsetLogLevel(loggerName)
-
-  def loggerExists(loggerName: String): Boolean = 
delegate.loggerExists(loggerName)
-}
-
-private class NoOpController extends LoggingControllerDelegate {
-  override def loggers: Map[String, String] = Map.empty
-
-  override def logLevel(loggerName: String, logLevel: String): Boolean = false
-
-  override def unsetLogLevel(loggerName: String): Boolean = false
-}
-
-private class Log4jCoreController extends LoggingControllerDelegate {
-  private[this] val logContext = 
LogManager.getContext(false).asInstanceOf[LoggerContext]
-
-  override def loggers: Map[String, String] = {
-    val rootLoggerLevel = logContext.getRootLogger.getLevel.toString
-
-    // Loggers defined in the configuration
-    val configured = logContext.getConfiguration.getLoggers.asScala
-      .values
-      .filterNot(_.getName.equals(LogManager.ROOT_LOGGER_NAME))
-      .map { logger =>
-        logger.getName -> logger.getLevel.toString
-      }.toMap
-
-    // Loggers actually running
-    val actual = logContext.getLoggers.asScala
-      .filterNot(_.getName.equals(LogManager.ROOT_LOGGER_NAME))
-      .map { logger =>
-        logger.getName -> logger.getLevel.toString
-      }.toMap
-
-    (configured ++ actual) + (ROOT_LOGGER -> rootLoggerLevel)
-  }
-
-  override def logLevel(loggerName: String, logLevel: String): Boolean = {
-    if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel))
-      return false
-
-    val level = Level.toLevel(logLevel.toUpperCase(Locale.ROOT))
-
-    if (loggerName == ROOT_LOGGER) {
-      Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, level)
-      true
-    } else {
-      if (loggerExists(loggerName) && level != null) {
-        Configurator.setLevel(loggerName, level)
-        true
-      }
-      else false
-    }
-  }
-
-  override def unsetLogLevel(loggerName: String): Boolean = {
-    val nullLevel: Level = null
-    if (loggerName == ROOT_LOGGER) {
-      Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, nullLevel)
-      true
-    } else {
-      if (loggerExists(loggerName)) {
-        Configurator.setLevel(loggerName, nullLevel)
-        true
-      }
-      else false
-    }
-  }
-}
-
-private abstract class LoggingControllerDelegate {
-  def loggers: Map[String, String]
-  def logLevel(loggerName: String, logLevel: String): Boolean
-  def unsetLogLevel(loggerName: String): Boolean
-  def loggerExists(loggerName: String): Boolean = loggers.contains(loggerName)
-}
-
-/**
- * An MBean that allows the user to dynamically alter log4j levels at runtime.
- * The companion object contains the singleton instance of this class and
- * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
- * of the companion object.
- */
-class LoggingController extends LoggingControllerMBean {
-
-  def getLoggers: util.List[String] = {
-    // we replace scala collection by java collection so mbean client is able 
to deserialize it without scala library.
-    new util.ArrayList[String](LoggingController.loggers.map {
-      case (logger, level) => s"$logger=$level"
-    }.toSeq.asJava)
-  }
-
-  def getLogLevel(loggerName: String): String = {
-    LoggingController.loggers.getOrElse(loggerName, "No such logger.")
-  }
-
-  def setLogLevel(loggerName: String, level: String): Boolean = 
LoggingController.logLevel(loggerName, level)
-}
-
-trait LoggingControllerMBean {
-  def getLoggers: java.util.List[String]
-  def getLogLevel(logger: String): String
-  def setLogLevel(logger: String, level: String): Boolean
-}
diff --git 
a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java 
b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java
index b700e162929..b5c8740639c 100644
--- a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java
+++ b/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java
@@ -16,13 +16,12 @@
  */
 package kafka.server.logger;
 
-import kafka.utils.LoggingController;
-
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
+import org.apache.kafka.server.logger.LoggingController;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -82,18 +81,18 @@ public class RuntimeLoggerManagerTest {
     @Test
     public void testValidateSetRootLogLevelConfig() {
         MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig().
-                setName(LoggingController.ROOT_LOGGER()).
+                setName(LoggingController.ROOT_LOGGER).
                 setConfigOperation(OpType.SET.id()).
                 setValue("TRACE")));
     }
 
     @Test
     public void testValidateRemoveRootLogLevelConfigNotAllowed() {
-        assertEquals("Removing the log level of the " + 
LoggingController.ROOT_LOGGER() +
+        assertEquals("Removing the log level of the " + 
LoggingController.ROOT_LOGGER +
             " logger is not allowed",
             Assertions.assertThrows(InvalidRequestException.class,
                 () -> MANAGER.validateLogLevelConfigs(List.of(new 
AlterableConfig().
-                    setName(LoggingController.ROOT_LOGGER()).
+                    setName(LoggingController.ROOT_LOGGER).
                     setConfigOperation(OpType.DELETE.id()).
                     setValue("")))).getMessage());
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 1fbc6523a92..11a334bf6e7 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -29,7 +29,7 @@ import java.{time, util}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils._
-import kafka.utils.{LoggingController, TestInfoUtils, TestUtils}
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.HostResolver
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource
@@ -53,6 +53,7 @@ import org.apache.kafka.coordinator.group.{GroupConfig, 
GroupCoordinatorConfig}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, 
ServerLogConfigs}
+import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogFileUtils}
 import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, 
assertFutureThrows}
 import org.apache.logging.log4j.core.config.Configurator
diff --git a/core/src/test/scala/kafka/utils/LoggingTest.scala 
b/core/src/test/scala/kafka/utils/LoggingTest.scala
index 7479f021649..761b276c400 100644
--- a/core/src/test/scala/kafka/utils/LoggingTest.scala
+++ b/core/src/test/scala/kafka/utils/LoggingTest.scala
@@ -17,6 +17,7 @@
 
 package kafka.utils
 
+import org.apache.kafka.server.logger.LoggingController
 import java.lang.management.ManagementFactory
 
 import javax.management.ObjectName
@@ -24,17 +25,8 @@ import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.slf4j.LoggerFactory
 
-
 class LoggingTest extends Logging {
 
-  @Test
-  def testTypeOfGetLoggers(): Unit = {
-    val log4jController = new LoggingController
-    // the return object of getLoggers must be a collection instance from java 
standard library.
-    // That enables mbean client to deserialize it without extra libraries.
-    assertEquals(classOf[java.util.ArrayList[String]], 
log4jController.getLoggers.getClass)
-  }
-
   @Test
   def testLog4jControllerIsRegistered(): Unit = {
     val mbs = ManagementFactory.getPlatformMBeanServer
@@ -42,7 +34,7 @@ class LoggingTest extends Logging {
     val log4jControllerName = 
ObjectName.getInstance("kafka:type=kafka.Log4jController")
     assertTrue(mbs.isRegistered(log4jControllerName), 
"kafka.utils.Log4jController is not registered")
     val log4jInstance = mbs.getObjectInstance(log4jControllerName)
-    assertEquals("kafka.utils.LoggingController", log4jInstance.getClassName)
+    assertEquals("org.apache.kafka.server.logger.LoggingController", 
log4jInstance.getClassName)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3ca2b111c08..5155b9f3aed 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,7 +23,7 @@ import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.share.SharePartitionManager
-import kafka.utils.{CoreUtils, Logging, LoggingController, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.common._
@@ -90,6 +90,7 @@ import org.apache.kafka.server.{ClientMetricsManager, 
SimpleApiVersionManager}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, ShareVersion, 
StreamsVersion, TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
+import org.apache.kafka.server.logger.LoggingController
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
 import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, SharePartitionKey}
 import org.apache.kafka.server.quota.ThrottleCallback
diff --git 
a/server/src/main/java/org/apache/kafka/server/logger/Log4jCoreController.java 
b/server/src/main/java/org/apache/kafka/server/logger/Log4jCoreController.java
new file mode 100644
index 00000000000..e8c9ca28819
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/logger/Log4jCoreController.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.logger;
+
+import org.apache.kafka.common.utils.Utils;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+class Log4jCoreController implements LoggingControllerDelegate {
+    private final LoggerContext logContext;
+
+    public Log4jCoreController() {
+        this.logContext = (LoggerContext) LogManager.getContext(false);
+    }
+
+    @Override
+    public Map<String, String> loggers() {
+        String rootLoggerLevel = 
logContext.getRootLogger().getLevel().toString();
+
+        Map<String, String> result = new HashMap<>();
+        // Loggers defined in the configuration
+        for (LoggerConfig logger : 
logContext.getConfiguration().getLoggers().values()) {
+            if (!logger.getName().equals(LogManager.ROOT_LOGGER_NAME)) {
+                result.put(logger.getName(), logger.getLevel().toString());
+            }
+        }
+        // Loggers actually running
+        for (Logger logger : logContext.getLoggers()) {
+            if (!logger.getName().equals(LogManager.ROOT_LOGGER_NAME)) {
+                result.put(logger.getName(), logger.getLevel().toString());
+            }
+        }
+        // Add root logger
+        result.put(LoggingController.ROOT_LOGGER, rootLoggerLevel);
+        return result;
+    }
+
+    @Override
+    public boolean logLevel(String loggerName, String logLevel) {
+        if (Utils.isBlank(loggerName) || Utils.isBlank(logLevel))
+            return false;
+
+        Level level = Level.toLevel(logLevel.toUpperCase(Locale.ROOT));
+
+        if (loggerName.equals(LoggingController.ROOT_LOGGER)) {
+            Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, level);
+            return true;
+        }
+        if (loggerExists(loggerName) && level != null) {
+            Configurator.setLevel(loggerName, level);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean unsetLogLevel(String loggerName) {
+        Level nullLevel = null;
+        if (loggerName.equals(LoggingController.ROOT_LOGGER)) {
+            Configurator.setLevel(LogManager.ROOT_LOGGER_NAME, nullLevel);
+            return true;
+        }
+        if (loggerExists(loggerName)) {
+            Configurator.setLevel(loggerName, nullLevel);
+            return true;
+        }
+        return false;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/logger/LoggingController.java 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingController.java
new file mode 100644
index 00000000000..fba75b10c07
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/logger/LoggingController.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.logger;
+
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An MBean that allows the user to dynamically alter log4j levels at runtime.
+ * The companion object contains the singleton instance of this class and
+ * registers the MBean. The {@code kafka.utils.Logging} trait forces 
initialization
+ * of the companion object.
+ */
+public class LoggingController implements LoggingControllerMBean {
+
+    private static final Logger LOGGER = 
LogManager.getLogger(LoggingController.class);
+
+    /**
+     * Note: In Log4j 1, the root logger's name was "root" and Kafka also 
followed that name for dynamic logging control feature.
+     * The root logger's name is changed in log4j2 to empty string (see: 
{@link LogManager#ROOT_LOGGER_NAME}) but for backward-compatibility.
+     * Kafka keeps its original root logger name. It is why here is a 
dedicated definition for the root logger name.
+     */
+    public static final String ROOT_LOGGER = "root";
+
+    private static final LoggingControllerDelegate DELEGATE;
+
+    static {
+        LoggingControllerDelegate tempDelegate;
+        try {
+            tempDelegate = new Log4jCoreController();
+        } catch (ClassCastException | LinkageError e) {
+            LOGGER.info("No supported logging implementation found. Logging 
configuration endpoint will be disabled.");
+            tempDelegate = new NoOpController();
+        } catch (Exception e) {
+            LOGGER.warn("A problem occurred, while initializing the logging 
controller. Logging configuration endpoint will be disabled.", e);
+            tempDelegate = new NoOpController();
+        }
+        DELEGATE = tempDelegate;
+    }
+
+    /**
+     * Returns a map of the log4j loggers and their assigned log level.
+     * If a logger does not have a log level assigned, we return the log level 
of the first ancestor with a level configured.
+     */
+    public static Map<String, String> loggers() {
+        return DELEGATE.loggers();
+    }
+
+    /**
+     * Sets the log level of a particular logger. If the given logLevel is not 
an available level
+     * (i.e., one of OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL) it 
falls back to DEBUG.
+     *
+     * @see Level#toLevel(String, Level)
+     */
+    public static boolean logLevel(String loggerName, String logLevel) {
+        return DELEGATE.logLevel(loggerName, logLevel);
+    }
+
+    public static boolean unsetLogLevel(String loggerName) {
+        return DELEGATE.unsetLogLevel(loggerName);
+    }
+
+    public static boolean loggerExists(String loggerName) {
+        return DELEGATE.loggerExists(loggerName);
+    }
+
+    @Override
+    public List<String> getLoggers() {
+        return LoggingController.loggers()
+                .entrySet()
+                .stream()
+                .map(entry -> entry.getKey() + "=" + entry.getValue())
+                .toList();
+    }
+
+    @Override
+    public String getLogLevel(String loggerName) {
+        return LoggingController.loggers().getOrDefault(loggerName, "No such 
logger.");
+    }
+
+    @Override
+    public boolean setLogLevel(String loggerName, String level) {
+        return LoggingController.logLevel(loggerName, level);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerDelegate.java
 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerDelegate.java
new file mode 100644
index 00000000000..0d73b3aca25
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerDelegate.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.logger;
+
+import java.util.Map;
+
+public interface LoggingControllerDelegate {
+    Map<String, String> loggers();
+    boolean logLevel(String loggerName, String logLevel);
+    boolean unsetLogLevel(String loggerName);
+    default boolean loggerExists(String loggerName) {
+        return loggers().containsKey(loggerName);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
new file mode 100644
index 00000000000..c5e47f2aca4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/kafka/server/logger/LoggingControllerMBean.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.logger;
+
+import java.util.List;
+
+
+public interface LoggingControllerMBean {
+    List<String> getLoggers();
+    String getLogLevel(String logger);
+    boolean setLogLevel(String logger, String level);
+}
\ No newline at end of file
diff --git 
a/server/src/main/java/org/apache/kafka/server/logger/NoOpController.java 
b/server/src/main/java/org/apache/kafka/server/logger/NoOpController.java
new file mode 100644
index 00000000000..b8cf2aa951d
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/logger/NoOpController.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.logger;
+
+import java.util.Map;
+
+class NoOpController implements LoggingControllerDelegate {
+
+    @Override
+    public Map<String, String> loggers() {
+        return Map.of();
+    }
+
+    @Override
+    public boolean logLevel(String loggerName, String logLevel) {
+        return false;
+    }
+
+    @Override
+    public boolean unsetLogLevel(String loggerName) {
+        return false;
+    }
+}

Reply via email to