showuon commented on code in PR #17373:
URL: https://github.com/apache/kafka/pull/17373#discussion_r1811968367
##########
core/src/main/scala/kafka/utils/Log4jController.scala:
##########
@@ -17,83 +17,86 @@
package kafka.utils
+import org.apache.logging.log4j.core.LoggerContext
+import org.apache.logging.log4j.core.config.Configurator
+import org.apache.logging.log4j.{Level, LogManager}
+
import java.util
import java.util.Locale
-
-import org.apache.kafka.common.utils.Utils
-import org.apache.log4j.{Level, LogManager, Logger}
-
-import scala.collection.mutable
import scala.jdk.CollectionConverters._
object Log4jController {
- val ROOT_LOGGER = "root"
-
- private def resolveLevel(logger: Logger): String = {
- var name = logger.getName
- var level = logger.getLevel
- while (level == null) {
- val index = name.lastIndexOf(".")
- if (index > 0) {
- name = name.substring(0, index)
- val ancestor = existingLogger(name)
- if (ancestor != null) {
- level = ancestor.getLevel
- }
- } else {
- level = existingLogger(ROOT_LOGGER).getLevel
- }
- }
- level.toString
- }
+
+ val ROOT_LOGGER = ""
/**
- * Returns a map of the log4j loggers and their assigned log level.
- * If a logger does not have a log level assigned, we return the root
logger's log level
- */
- def loggers: mutable.Map[String, String] = {
- val logs = new mutable.HashMap[String, String]()
- val rootLoggerLvl = existingLogger(ROOT_LOGGER).getLevel.toString
- logs.put(ROOT_LOGGER, rootLoggerLvl)
-
- val loggers = LogManager.getCurrentLoggers
- while (loggers.hasMoreElements) {
- val logger = loggers.nextElement().asInstanceOf[Logger]
- if (logger != null) {
- logs.put(logger.getName, resolveLevel(logger))
- }
- }
- logs
+ * Returns given logger's parent's (or the first ancestor's) name.
+ *
+ * @throws IllegalArgumentException loggerName is null or empty.
+ */
Review Comment:
This is for which method?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java:
##########
@@ -166,25 +175,32 @@ public void testSetLevel() {
@Test
public void testSetRootLevel() {
- Logger root = logger("root");
- root.setLevel(Level.ERROR);
-
- Logger p = logger("a.b.c.p");
- Logger x = logger("a.b.c.p.X");
- Logger y = logger("a.b.c.p.Y");
- Logger z = logger("a.b.c.p.Z");
- Logger w = logger("a.b.c.s.W");
- x.setLevel(Level.INFO);
- y.setLevel(Level.INFO);
- z.setLevel(Level.INFO);
- w.setLevel(Level.INFO);
+ LoggerContext loggerContext = (LoggerContext)
LogManager.getContext(false);
+ Configuration config = loggerContext.getConfiguration();
+ LoggerConfig rootConfig = new LoggerConfig("", Level.ERROR, false);
+ config.addLogger("", rootConfig);
+ loggerContext.updateLoggers();
+
+ Logger root = LogManager.getLogger("");
Review Comment:
I'm not familiar with log4j2, so I don't understand the empty name here. Why
can't we use `getRootLogger` as above?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -3076,72 +3083,59 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
- @Disabled // To be re-enabled once KAFKA-8779 is resolved
def testIncrementalAlterConfigsForLog4jLogLevels(quorum: String): Unit = {
client = createAdminClient
+ val ancestorLogger = "kafka";
val initialLoggerConfig = describeBrokerLoggers()
- val initialRootLogLevel =
initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()
- assertEquals(initialRootLogLevel,
initialLoggerConfig.get("kafka.controller.KafkaController").value())
- assertEquals(initialRootLogLevel,
initialLoggerConfig.get("kafka.log.LogCleaner").value())
- assertEquals(initialRootLogLevel,
initialLoggerConfig.get("kafka.server.ReplicaManager").value())
-
- val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL
- val alterRootLoggerEntry = Seq(
- new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER,
newRootLogLevel), AlterConfigOp.OpType.SET)
+ val initialKafkaLogLevel = initialLoggerConfig.get("kafka").value()
+ assertEquals(initialKafkaLogLevel,
initialLoggerConfig.get("kafka.server.ControllerServer").value())
+ assertEquals(initialKafkaLogLevel,
initialLoggerConfig.get("kafka.log.LogCleaner").value())
+ assertEquals(initialKafkaLogLevel,
initialLoggerConfig.get("kafka.server.ReplicaManager").value())
Review Comment:
Why can't we change the root log level now?
##########
streams/streams-scala/src/test/resources/log4j2.properties:
##########
@@ -16,19 +16,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=INFO, R
+name=StreamsScalaTestConfig
+appenders=console,rolling
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
+appender.console.type=Console
+appender.console.name=A1
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=logs/kafka-streams-scala.log
+appender.rolling.type=RollingFile
+appender.rolling.name=R
+appender.rolling.fileName=logs/kafka-streams-scala.log
+appender.rolling.filePattern=logs/kafka-streams-scala.log.%i
+appender.rolling.layout.type=PatternLayout
+appender.rolling.layout.pattern=%-4r [%t] %-5p %c %x - %m%n
-log4j.appender.R.MaxFileSize=100KB
-# Keep one backup file
-log4j.appender.R.MaxBackupIndex=1
+appender.rolling.policies.type=Policies
+appender.rolling.policies.size.type=SizeBasedTriggeringPolicy
+appender.rolling.policies.size.size=100KB
-# A1 uses PatternLayout.
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+appender.rolling.strategy.type=DefaultRolloverStrategy
+appender.rolling.strategy.max=1
+
+rootLogger.level=INFO
+rootLogger.appenderRefs=R
+rootLogger.appenderRef.R.ref=R
Review Comment:
I'm not familiar with log4j2, just want to confirm will it confuse the
setting?
##########
gradle/dependencies.gradle:
##########
@@ -152,7 +152,9 @@ versions += [
// Also make sure the compression levels in
org.apache.kafka.common.record.CompressionType are still valid
zstd: "1.5.6-6",
junitPlatform: "1.10.2",
- hdrHistogram: "2.2.2"
+ hdrHistogram: "2.2.2",
+ log4j2: "2.24.1",
Review Comment:
I think our goal in this PR, is to remove the reload4j above. Thanks for
looking into how we can completely remove it!
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -128,7 +124,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
try {
val alterLogLevelsEntries = Seq(
- new ConfigEntry("kafka.controller.KafkaController",
LogLevelConfig.INFO_LOG_LEVEL)
+ new ConfigEntry("kafka.server.ControllerServer",
LogLevelConfig.INFO_LOG_LEVEL)
Review Comment:
This change is for ZK removal?
##########
core/src/main/scala/kafka/utils/Log4jController.scala:
##########
@@ -17,83 +17,86 @@
package kafka.utils
+import org.apache.logging.log4j.core.LoggerContext
+import org.apache.logging.log4j.core.config.Configurator
+import org.apache.logging.log4j.{Level, LogManager}
+
import java.util
import java.util.Locale
-
-import org.apache.kafka.common.utils.Utils
-import org.apache.log4j.{Level, LogManager, Logger}
-
-import scala.collection.mutable
import scala.jdk.CollectionConverters._
object Log4jController {
- val ROOT_LOGGER = "root"
-
- private def resolveLevel(logger: Logger): String = {
Review Comment:
We don't need `resolveLevel` now because log4j2 will do that for us? Do we
have test for it?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -3264,28 +3255,6 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
def describeBrokerLoggers(): Config =
client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get()
- /**
- * Due to the fact that log4j is not re-initialized across tests, changing a
logger's log level persists across test classes.
- * We need to clean up the changes done while testing.
- */
- private def teardownBrokerLoggers(): Unit = {
Review Comment:
OK, this is replaced with `reconfigure()`, right?
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/LoggersTest.java:
##########
@@ -166,25 +175,32 @@ public void testSetLevel() {
@Test
public void testSetRootLevel() {
- Logger root = logger("root");
- root.setLevel(Level.ERROR);
-
- Logger p = logger("a.b.c.p");
- Logger x = logger("a.b.c.p.X");
- Logger y = logger("a.b.c.p.Y");
- Logger z = logger("a.b.c.p.Z");
- Logger w = logger("a.b.c.s.W");
- x.setLevel(Level.INFO);
- y.setLevel(Level.INFO);
- z.setLevel(Level.INFO);
- w.setLevel(Level.INFO);
+ LoggerContext loggerContext = (LoggerContext)
LogManager.getContext(false);
+ Configuration config = loggerContext.getConfiguration();
+ LoggerConfig rootConfig = new LoggerConfig("", Level.ERROR, false);
+ config.addLogger("", rootConfig);
+ loggerContext.updateLoggers();
+
+ Logger root = LogManager.getLogger("");
+ Configurator.setLevel(root, Level.ERROR);
+
+ Logger p = loggerContext.getLogger("a.b.c.p");
+ Logger x = loggerContext.getLogger("a.b.c.p.X");
+ Logger y = loggerContext.getLogger("a.b.c.p.Y");
+ Logger z = loggerContext.getLogger("a.b.c.p.Z");
+ Logger w = loggerContext.getLogger("a.b.c.s.W");
+ Configurator.setLevel(p, null);
+ Configurator.setLevel(x, Level.INFO);
+ Configurator.setLevel(y, Level.INFO);
+ Configurator.setLevel(z, Level.INFO);
+ Configurator.setLevel(w, Level.INFO);
Loggers loggers = new TestLoggers(root, x, y, z, w);
- List<String> modified = loggers.setLevel("root", Level.DEBUG);
- assertEquals(Arrays.asList("a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z",
"a.b.c.s.W", "root"), modified);
+ List<String> modified = loggers.setLevel("", Level.DEBUG);
+ assertEquals(Arrays.asList("", "a.b.c.p.X", "a.b.c.p.Y", "a.b.c.p.Z",
"a.b.c.s.W"), modified);
- assertNull(p.getLevel());
+ assertEquals(p.getLevel(), Level.DEBUG);
Review Comment:
Is this behavior change expected?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]