yashmayya commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360494846


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {
+        Objects.requireNonNull(logger, "Logger may not be null");
+
+        org.apache.log4j.Logger foundLogger = null;
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+            foundLogger = rootLogger();
+        } else {
+            Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+            // search within existing loggers for the given name.
+            // using LogManger.getLogger() will create a logger if it doesn't 
exist
+            // (potential leak since these don't get cleaned up).
+            while (en.hasMoreElements()) {
+                org.apache.log4j.Logger l = en.nextElement();
+                if (logger.equals(l.getName())) {
+                    foundLogger = l;
+                    break;
+                }
+            }
+        }
+
+        if (foundLogger == null) {
+            log.warn("Unable to find level for logger {}", logger);
+            return null;
+        }
+
+        return loggerLevel(foundLogger);
+    }
+
+    /**
+     * Retrieve the current levels of all known loggers
+     * @return the levels of all known loggers; may be empty, but never null
+     */
+    public synchronized Map<String, LoggerLevel> allLevels() {
+        Map<String, LoggerLevel> result = new TreeMap<>();
+
+        Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers();
+        Collections.list(enumeration)
+                .stream()
+                .filter(logger -> logger.getLevel() != null)
+                .forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+        org.apache.log4j.Logger root = rootLogger();
+        if (root.getLevel() != null) {
+            result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the level for the specified logger and all of its children
+     * @param namespace the name of the logger to adjust along with its 
children; may not be nul

Review Comment:
   ```suggestion
        * @param namespace the name of the logger to adjust along with its 
children; may not be null
   ```
   nit



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, 
Callback<Message> callback) {
      * @param cb callback to invoke upon completion
      */
     protected abstract void modifyConnectorOffsets(String connName, 
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
+
+    @Override
+    public LoggerLevel loggerLevel(String logger) {
+        return loggers.level(logger);
+    }
+
+    @Override
+    public Map<String, LoggerLevel> allLoggerLevels() {
+        return loggers.allLevels();
+    }
+
+    @Override
+    public List<String> setWorkerLoggerLevel(String namespace, String 
desiredLevelStr) {
+        Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), 
null);
+
+        if (level == null) {
+            log.warn("Ignoring request to set invalid level '{}' for namespace 
{}", desiredLevelStr, namespace);
+            return Collections.emptyList();
+        }

Review Comment:
   Hm I guess this also helps in the extremely unlikely case that a new logger 
level is introduced in the future and is set on a Connect cluster mid-upgrade.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java:
##########
@@ -94,122 +88,54 @@ public Response listLoggers() {
     public Response getLogger(final @PathParam("logger") String namedLogger) {
         Objects.requireNonNull(namedLogger, "require non-null name");
 
-        Logger logger = null;
-        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
-            logger = rootLogger();
-        } else {
-            Enumeration<Logger> en = currentLoggers();
-            // search within existing loggers for the given name.
-            // using LogManger.getLogger() will create a logger if it doesn't 
exist
-            // (potential leak since these don't get cleaned up).
-            while (en.hasMoreElements()) {
-                Logger l = en.nextElement();
-                if (namedLogger.equals(l.getName())) {
-                    logger = l;
-                    break;
-                }
-            }
-        }
-        if (logger == null) {
+        LoggerLevel loggerLevel = herder.loggerLevel(namedLogger);
+        if (loggerLevel == null)
             throw new NotFoundException("Logger " + namedLogger + " not 
found.");
-        } else {
-            return Response.ok(effectiveLevelToMap(logger)).build();
-        }
-    }
 
+        return Response.ok(loggerLevel).build();
+    }
 
     /**
      * Adjust level of a named logger. If the name corresponds to an ancestor, 
then the log level is applied to all child loggers.
      *
-     * @param namedLogger name of the logger
+     * @param namespace name of the logger
      * @param levelMap a map that is expected to contain one key 'level', and 
a value that is one of the log4j levels:
      *                 DEBUG, ERROR, FATAL, INFO, TRACE, WARN
      * @return names of loggers whose levels were modified
      */
     @PUT
     @Path("/{logger}")
     @Operation(summary = "Set the log level for the specified logger")
-    public Response setLevel(final @PathParam("logger") String namedLogger,
-                             final Map<String, String> levelMap) {
-        String desiredLevelStr = levelMap.get("level");
-        if (desiredLevelStr == null) {
-            throw new BadRequestException("Desired 'level' parameter was not 
specified in request.");
+    @SuppressWarnings("fallthrough")
+    public Response setLevel(final @PathParam("logger") String namespace,
+                             final Map<String, String> levelMap,
+                             @DefaultValue("worker") @QueryParam("scope") 
@Parameter(description = "The scope for the logging modification 
(single-worker, cluster-wide, etc.)") String scope) {
+        if (scope == null) {
+            log.warn("Received null scope in request to adjust logging level; 
will default to {}", WORKER_SCOPE);

Review Comment:
   I guess this is a warning log because the `DefaultValue` annotation on the 
`scope` query param makes this an unexpected case?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -271,6 +271,14 @@ public static String RESTART_KEY(String connectorName) {
             .field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
             .build();
 
+    public static final String LOGGER_CLUSTER_PREFIX = "logger-cluster-";
+    public static String LOGGER_CLUSTER_KEY(String namespace) {

Review Comment:
   I know that this is just following the pattern already established in this 
class, but it's pretty unusual to name methods using `SCREAMING_SNAKE_CASE` 
(only used for constants AFAIK). Might be worth standardizing these to 
`camelCase` in either this PR or a separate one.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {
+        Objects.requireNonNull(logger, "Logger may not be null");
+
+        org.apache.log4j.Logger foundLogger = null;
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+            foundLogger = rootLogger();
+        } else {
+            Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+            // search within existing loggers for the given name.
+            // using LogManger.getLogger() will create a logger if it doesn't 
exist
+            // (potential leak since these don't get cleaned up).
+            while (en.hasMoreElements()) {
+                org.apache.log4j.Logger l = en.nextElement();
+                if (logger.equals(l.getName())) {
+                    foundLogger = l;
+                    break;
+                }
+            }
+        }
+
+        if (foundLogger == null) {
+            log.warn("Unable to find level for logger {}", logger);
+            return null;
+        }
+
+        return loggerLevel(foundLogger);
+    }
+
+    /**
+     * Retrieve the current levels of all known loggers
+     * @return the levels of all known loggers; may be empty, but never null
+     */
+    public synchronized Map<String, LoggerLevel> allLevels() {
+        Map<String, LoggerLevel> result = new TreeMap<>();
+
+        Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers();
+        Collections.list(enumeration)
+                .stream()
+                .filter(logger -> logger.getLevel() != null)
+                .forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+        org.apache.log4j.Logger root = rootLogger();
+        if (root.getLevel() != null) {
+            result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the level for the specified logger and all of its children
+     * @param namespace the name of the logger to adjust along with its 
children; may not be nul
+     * @param level the level to set for the logger and its children; may be 
null
+     * @return all loggers that were affected by this action; may be empty, 
but never null
+     */
+    public synchronized List<String> setLevel(String namespace, Level level) {
+        Objects.requireNonNull(namespace, "Logging namespace may not be null");
+
+        log.info("Setting level of namespace {} and children to {}", 
namespace, level);
+        List<org.apache.log4j.Logger> childLoggers = loggers(namespace);
+
+        List<String> result = new ArrayList<>();
+        for (org.apache.log4j.Logger logger: childLoggers) {
+            setLevel(logger, level);
+            result.add(logger.getName());
+        }
+        Collections.sort(result);
+
+        return result;
+    }
+
+    /**
+     * Retrieve all known loggers within a given namespace, creating an 
ancestor logger for that
+     * namespace if one does not already exist
+     * @param namespace the namespace that the loggers should fall under; may 
not be null
+     * @return all loggers that fall under the given namespace; never null, 
and will always contain
+     * at least one logger (the ancestor logger for the namespace)
+     */
+    private synchronized List<org.apache.log4j.Logger> loggers(String 
namespace) {
+        Objects.requireNonNull(namespace, "Logging namespace may not be null");
+
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namespace)) {
+            List<org.apache.log4j.Logger> result = 
Collections.list(currentLoggers());
+            result.add(rootLogger());
+            return result;
+        }
+
+        List<org.apache.log4j.Logger> result = new ArrayList<>();
+        org.apache.log4j.Logger ancestorLogger = lookupLogger(namespace);
+        Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+        boolean present = false;
+        while (en.hasMoreElements()) {
+            org.apache.log4j.Logger current = en.nextElement();
+            if (current.getName().startsWith(namespace)) {
+                result.add(current);
+            }
+            if (namespace.equals(current.getName())) {
+                present = true;
+            }
+        }
+
+        if (!present) {
+            result.add(ancestorLogger);
+        }

Review Comment:
   I know this has been copied over from the existing implementation but I'm 
wondering if we can refactor this a little to use something like a set instead 
of using this `present` boolean flag which seems a little clunky?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -122,6 +122,13 @@ public interface ConfigBackingStore {
     default void claimWritePrivileges() {
     }
 
+    /**
+     * Store a new level for the specified logging namespace (and all of its 
children)

Review Comment:
   This makes it sound like a persistent update - I think it might be worth 
clarifying here that it isn't?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {
+        Objects.requireNonNull(logger, "Logger may not be null");
+
+        org.apache.log4j.Logger foundLogger = null;
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+            foundLogger = rootLogger();
+        } else {
+            Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+            // search within existing loggers for the given name.
+            // using LogManger.getLogger() will create a logger if it doesn't 
exist
+            // (potential leak since these don't get cleaned up).
+            while (en.hasMoreElements()) {
+                org.apache.log4j.Logger l = en.nextElement();
+                if (logger.equals(l.getName())) {
+                    foundLogger = l;
+                    break;
+                }
+            }
+        }
+
+        if (foundLogger == null) {
+            log.warn("Unable to find level for logger {}", logger);
+            return null;
+        }
+
+        return loggerLevel(foundLogger);
+    }
+
+    /**
+     * Retrieve the current levels of all known loggers
+     * @return the levels of all known loggers; may be empty, but never null
+     */
+    public synchronized Map<String, LoggerLevel> allLevels() {
+        Map<String, LoggerLevel> result = new TreeMap<>();
+
+        Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers();
+        Collections.list(enumeration)
+                .stream()
+                .filter(logger -> logger.getLevel() != null)
+                .forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+        org.apache.log4j.Logger root = rootLogger();
+        if (root.getLevel() != null) {
+            result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the level for the specified logger and all of its children
+     * @param namespace the name of the logger to adjust along with its 
children; may not be nul
+     * @param level the level to set for the logger and its children; may be 
null

Review Comment:
   > may be null
   
   Should this be "may not be null" instead? The only non-testing call-site for 
this method has a null check before invoking this method. Also, it looks like 
an NPE will be thrown in the call to `setLevel(org.apache.log4j.Logger logger, 
Level level)` if `level` is null.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, 
Callback<Message> callback) {
      * @param cb callback to invoke upon completion
      */
     protected abstract void modifyConnectorOffsets(String connName, 
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
+
+    @Override
+    public LoggerLevel loggerLevel(String logger) {
+        return loggers.level(logger);
+    }
+
+    @Override
+    public Map<String, LoggerLevel> allLoggerLevels() {
+        return loggers.allLevels();
+    }
+
+    @Override
+    public List<String> setWorkerLoggerLevel(String namespace, String 
desiredLevelStr) {
+        Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), 
null);
+
+        if (level == null) {
+            log.warn("Ignoring request to set invalid level '{}' for namespace 
{}", desiredLevelStr, namespace);
+            return Collections.emptyList();
+        }

Review Comment:
   We already have a check at the REST API layer to prevent invalid logger 
levels from being set. Is this additional check to guard against garbage 
records being written manually to the config topic?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -164,6 +171,13 @@ interface UpdateListener {
          * @param restartRequest the {@link RestartRequest restart request}
          */
         void onRestartRequest(RestartRequest restartRequest);
+
+        /**
+         * Invoked when a dynamic log level adjustment has been read

Review Comment:
   Similar to the above comment, I think it might be worth calling out here 
that this will only be invoked for new log level updates.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class LoggerLevel {
+
+    private final String level;
+    private final Long lastModified;
+
+    public LoggerLevel(
+            @JsonProperty("level") String level,
+            @JsonProperty("last_modified") Long lastModified
+    ) {
+        this.level = Objects.requireNonNull(level, "level may not be null");
+        this.lastModified = lastModified;
+    }
+
+    @JsonProperty
+    public String level() {
+        return level;
+    }
+
+    @JsonProperty("last_modified")
+    public Long lastModified() {
+        return lastModified;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        LoggerLevel that = (LoggerLevel) o;
+        return level.equals(that.level) && Objects.equals(lastModified, 
that.lastModified);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(level, lastModified);
+    }
+
+    @Override
+    public String toString() {
+        return "LoggerLevel{"
+                + "level='" + level + '\''

Review Comment:
   nit: why not `"'"` instead of `'\''`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to