C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360768631


##########
checkstyle/suppressions.xml:
##########
@@ -139,6 +139,8 @@
               files="Worker(SinkTask|SourceTask|Coordinator).java"/>
     <suppress checks="ParameterNumber"
               files="(ConfigKeyInfo|DistributedHerder).java"/>
+    <suppress checks="DefaultComesLast"
+              files="LoggingResource.java" />

Review Comment:
   Switching on enums still requires a default case; there's a good explanation 
on 
[StackOverflow](https://stackoverflow.com/questions/5013194/why-is-default-required-for-a-switch-on-an-enum)
 for why.



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
             wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
                        err_msg="Failed to see connector startup in PAUSED 
state")
 
+    @cluster(num_nodes=5)
+    def test_dynamic_logging(self):
+        """
+        Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+        """
+
+        self.setup_services(num_workers=3)
+        self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        worker = self.cc.nodes[0]
+        prior_all_loggers = self.cc.get_all_loggers(worker)
+        self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+        assert prior_all_loggers is not None
+        assert 'root' in prior_all_loggers
+        # We need root and at least one other namespace (the other namespace 
is checked
+        # later on to make sure that it hasn't changed)
+        assert len(prior_all_loggers) >= 2
+        for logger in prior_all_loggers.values():
+            assert logger['last_modified'] is None
+
+        namespace = None
+        for logger in prior_all_loggers.keys():
+            if logger != 'root':
+                namespace = logger
+                break
+        assert namespace is not None
+
+        initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+        # Make sure we pick a different one than what's already set for that 
namespace
+        new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+        self.cc.set_logger(worker, namespace, 'ERROR')
+        request_time = int(time.time() * 1000)
+        affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+        assert len(affected_loggers) >= 1
+        for logger in affected_loggers:
+            assert logger.startswith(namespace)
+
+        assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+        assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+        # Force all loggers to get updated by setting the root namespace to
+        # two different levels
+        # This guarantees that their last-modified times will be updated
+        resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+        assert resp is None
+
+        new_root = 'INFO'
+        request_time = int(time.time() * 1000)
+        resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+        assert resp is None
+        wait_until(
+            lambda: self.loggers_set(new_root, request_time),
+            # This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+            timeout_sec=10,
+            err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+        )
+
+        new_level = 'DEBUG'
+        request_time = int(time.time() * 1000)
+        resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+        assert resp is None
+        wait_until(
+            lambda: self.loggers_set(new_level, request_time, namespace),
+            timeout_sec=10,
+            err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+        )
+
+        prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+        resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+        assert resp is None
+
+        prior_namespace = namespace
+        new_namespace = None
+        for logger, level in prior_all_loggers[0].items():
+            if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   That assertion (`num_loggers = 3`) does not necessarily have to be true for 
this condition to be met, since altering a logging namespace will also cause 
child namespaces to be modified and begin to appear in the content of the `GET 
/admin/loggers` endpoint.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -297,8 +297,8 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
         // When automatic topic creation is disabled on the broker
         brokerProps.put("auto.create.topics.enable", "false");
         connect = connectBuilder
-            .brokerProps(brokerProps)
             .numWorkers(1)
+            .brokerProps(brokerProps)

Review Comment:
   Whoops! Reverted. Accidentally left in from an earlier iteration of the 
refactored `Builder` API.



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -81,7 +81,13 @@ def __init__(self, test_context):
         self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
         self.schemas = True
 
-    def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, 
timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, 
include_filestream_connectors=False):
+    def setup_services(self,

Review Comment:
   I wanted to be able to explicitly set the number of workers to 3. The 
current default is 3, but I wanted updates to that default to not affect the 
new dynamic logging system test.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, 
Callback<Message> callback) {
      * @param cb callback to invoke upon completion
      */
     protected abstract void modifyConnectorOffsets(String connName, 
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
+
+    @Override
+    public LoggerLevel loggerLevel(String logger) {
+        return loggers.level(logger);
+    }
+
+    @Override
+    public Map<String, LoggerLevel> allLoggerLevels() {
+        return loggers.allLevels();
+    }
+
+    @Override
+    public List<String> setWorkerLoggerLevel(String namespace, String 
desiredLevelStr) {
+        Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), 
null);
+
+        if (level == null) {
+            log.warn("Ignoring request to set invalid level '{}' for namespace 
{}", desiredLevelStr, namespace);
+            return Collections.emptyList();
+        }

Review Comment:
   Yep, both guesses are accurate 👍 
   High-level, I figured it was better to put this guard in place since the log 
level string doesn't necessarily have to originate from a source that went 
through the REST API of this worker, or even a worker with this version. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -164,6 +171,13 @@ interface UpdateListener {
          * @param restartRequest the {@link RestartRequest restart request}
          */
         void onRestartRequest(RestartRequest restartRequest);
+
+        /**
+         * Invoked when a dynamic log level adjustment has been read

Review Comment:
   This part I think should stay as-is. If we end up supporting persistent 
cluster-wide updates, we may use this exact same internal API to communicate 
them to herders.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, 
Callback<Message> callback) {
      * @param cb callback to invoke upon completion
      */
     protected abstract void modifyConnectorOffsets(String connName, 
Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb);
+
+    @Override
+    public LoggerLevel loggerLevel(String logger) {
+        return loggers.level(logger);
+    }
+
+    @Override
+    public Map<String, LoggerLevel> allLoggerLevels() {
+        return loggers.allLevels();
+    }
+
+    @Override
+    public List<String> setWorkerLoggerLevel(String namespace, String 
desiredLevelStr) {
+        Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), 
null);
+
+        if (level == null) {
+            log.warn("Ignoring request to set invalid level '{}' for namespace 
{}", desiredLevelStr, namespace);
+            return Collections.emptyList();
+        }

Review Comment:
   Yep, both guesses are accurate 👍 
   High-level, I figured it was better to put this guard in place since the log 
level string doesn't necessarily have to originate from a source that went 
through the REST API of this worker, or even a worker with this version. 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class LoggerLevel {
+
+    private final String level;
+    private final Long lastModified;
+
+    public LoggerLevel(
+            @JsonProperty("level") String level,
+            @JsonProperty("last_modified") Long lastModified
+    ) {
+        this.level = Objects.requireNonNull(level, "level may not be null");
+        this.lastModified = lastModified;
+    }
+
+    @JsonProperty
+    public String level() {
+        return level;
+    }
+
+    @JsonProperty("last_modified")
+    public Long lastModified() {
+        return lastModified;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        LoggerLevel that = (LoggerLevel) o;
+        return level.equals(that.level) && Objects.equals(lastModified, 
that.lastModified);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(level, lastModified);
+    }
+
+    @Override
+    public String toString() {
+        return "LoggerLevel{"
+                + "level='" + level + '\''

Review Comment:
   Ask my IDE, this is all auto-generated 🤷



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {
+        Objects.requireNonNull(logger, "Logger may not be null");
+
+        org.apache.log4j.Logger foundLogger = null;
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+            foundLogger = rootLogger();
+        } else {
+            Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+            // search within existing loggers for the given name.
+            // using LogManger.getLogger() will create a logger if it doesn't 
exist
+            // (potential leak since these don't get cleaned up).
+            while (en.hasMoreElements()) {
+                org.apache.log4j.Logger l = en.nextElement();
+                if (logger.equals(l.getName())) {
+                    foundLogger = l;
+                    break;
+                }
+            }
+        }
+
+        if (foundLogger == null) {
+            log.warn("Unable to find level for logger {}", logger);
+            return null;
+        }
+
+        return loggerLevel(foundLogger);
+    }
+
+    /**
+     * Retrieve the current levels of all known loggers
+     * @return the levels of all known loggers; may be empty, but never null
+     */
+    public synchronized Map<String, LoggerLevel> allLevels() {
+        Map<String, LoggerLevel> result = new TreeMap<>();
+
+        Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers();
+        Collections.list(enumeration)
+                .stream()
+                .filter(logger -> logger.getLevel() != null)
+                .forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+        org.apache.log4j.Logger root = rootLogger();
+        if (root.getLevel() != null) {
+            result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the level for the specified logger and all of its children
+     * @param namespace the name of the logger to adjust along with its 
children; may not be nul

Review Comment:
   Was afraid to apply directly via the UI after rebasing onto trunk locally, 
but I did apply this in a follow-up commit that I've pushed now. Good catch!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java:
##########
@@ -94,122 +88,54 @@ public Response listLoggers() {
     public Response getLogger(final @PathParam("logger") String namedLogger) {
         Objects.requireNonNull(namedLogger, "require non-null name");
 
-        Logger logger = null;
-        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
-            logger = rootLogger();
-        } else {
-            Enumeration<Logger> en = currentLoggers();
-            // search within existing loggers for the given name.
-            // using LogManger.getLogger() will create a logger if it doesn't 
exist
-            // (potential leak since these don't get cleaned up).
-            while (en.hasMoreElements()) {
-                Logger l = en.nextElement();
-                if (namedLogger.equals(l.getName())) {
-                    logger = l;
-                    break;
-                }
-            }
-        }
-        if (logger == null) {
+        LoggerLevel loggerLevel = herder.loggerLevel(namedLogger);
+        if (loggerLevel == null)
             throw new NotFoundException("Logger " + namedLogger + " not 
found.");
-        } else {
-            return Response.ok(effectiveLevelToMap(logger)).build();
-        }
-    }
 
+        return Response.ok(loggerLevel).build();
+    }
 
     /**
      * Adjust level of a named logger. If the name corresponds to an ancestor, 
then the log level is applied to all child loggers.
      *
-     * @param namedLogger name of the logger
+     * @param namespace name of the logger
      * @param levelMap a map that is expected to contain one key 'level', and 
a value that is one of the log4j levels:
      *                 DEBUG, ERROR, FATAL, INFO, TRACE, WARN
      * @return names of loggers whose levels were modified
      */
     @PUT
     @Path("/{logger}")
     @Operation(summary = "Set the log level for the specified logger")
-    public Response setLevel(final @PathParam("logger") String namedLogger,
-                             final Map<String, String> levelMap) {
-        String desiredLevelStr = levelMap.get("level");
-        if (desiredLevelStr == null) {
-            throw new BadRequestException("Desired 'level' parameter was not 
specified in request.");
+    @SuppressWarnings("fallthrough")
+    public Response setLevel(final @PathParam("logger") String namespace,
+                             final Map<String, String> levelMap,
+                             @DefaultValue("worker") @QueryParam("scope") 
@Parameter(description = "The scope for the logging modification 
(single-worker, cluster-wide, etc.)") String scope) {
+        if (scope == null) {
+            log.warn("Received null scope in request to adjust logging level; 
will default to {}", WORKER_SCOPE);

Review Comment:
   Yep, Yash is right. You can verify this by running the 
`StandaloneWorkerTest::testDynamicLogging` case, setting a breakpoint after the 
first call to set a log level without a scope, and then examining the logs when 
the breakpoint is hit.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {

Review Comment:
   Yeah, the synchronization here is fairly coarse-grained but I don't think 
it's likely to be a bottleneck (who's going to be issuing hundreds of dynamic 
log requests a second?).
   
   It's also necessary for this read-only operation to ensure that log levels 
and their last-modified times are kept in-sync.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.cli.ConnectStandalone;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;
+import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
+
+/**
+ * Start a standalone embedded connect worker. Internally, this class will 
spin up a Kafka and Zk cluster,
+ * setup any tmp directories and clean up them on them. Methods on the same
+ * {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe.
+ */
+public class EmbeddedConnectStandalone extends EmbeddedConnect {
+
+    private static final Logger log = 
LoggerFactory.getLogger(EmbeddedConnectStandalone.class);
+
+    private static final String REST_HOST_NAME = "localhost";
+
+    private final Map<String, String> workerProps;
+    private final String offsetsFile;
+
+    private WorkerHandle connectWorker;
+
+    private EmbeddedConnectStandalone(
+            int numBrokers,
+            Properties brokerProps,
+            boolean maskExitProcedures,
+            Map<String, String> clientProps,
+            Map<String, String> workerProps,
+            String offsetsFile
+    ) {
+        super(numBrokers, brokerProps, maskExitProcedures, clientProps);
+        this.workerProps = workerProps;
+        this.offsetsFile = offsetsFile;
+    }
+
+    @Override
+    public void startConnect() {
+        log.info("Starting standalone Connect worker");
+
+        workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers());
+        // use a random available port
+        workerProps.put(LISTENERS_CONFIG, "HTTP://" + REST_HOST_NAME + ":0");
+
+        workerProps.putIfAbsent(OFFSET_STORAGE_FILE_FILENAME_CONFIG, 
offsetsFile);
+        workerProps.putIfAbsent(KEY_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        workerProps.putIfAbsent(VALUE_CONVERTER_CLASS_CONFIG, 
"org.apache.kafka.connect.storage.StringConverter");
+        workerProps.putIfAbsent(PLUGIN_DISCOVERY_CONFIG, "hybrid_fail");
+
+        Connect connect = new ConnectStandalone().startConnect(workerProps);
+        connectWorker = new WorkerHandle("standalone", connect);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("EmbeddedConnectStandalone(numBrokers= %d, 
workerProps= %s)",
+            numBrokers,
+            workerProps);
+    }
+
+    @Override
+    protected Set<WorkerHandle> workers() {
+        return connectWorker != null
+                ? Collections.singleton(connectWorker)
+                : Collections.emptySet();
+    }
+
+    public static class Builder extends 
EmbeddedConnectBuilder<EmbeddedConnectStandalone, Builder> {
+
+        private String offsetsFile = null;
+
+        public Builder offsetsFile(String offsetsFile) {
+            this.offsetsFile = offsetsFile;
+            return this;
+        }
+
+        @Override
+        protected EmbeddedConnectStandalone build(
+                int numBrokers,
+                Properties brokerProps,
+                boolean maskExitProcedures,
+                Map<String, String> clientProps,
+                Map<String, String> workerProps
+        ) {
+            if (offsetsFile == null)
+                offsetsFile = tempOffsetsFile();
+
+            return new EmbeddedConnectStandalone(
+                    numBrokers,
+                    brokerProps,
+                    maskExitProcedures,
+                    clientProps,
+                    workerProps,
+                    offsetsFile
+            );
+        }
+
+        private String tempOffsetsFile() {
+            try {
+                return TestUtils
+                        .tempFile("connect-standalone-offsets", "")

Review Comment:
   Was afraid to apply directly via the UI after rebasing onto trunk locally, 
but I did apply this in a follow-up commit that I've pushed now. Good catch, 
thanks!



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {
+        Objects.requireNonNull(logger, "Logger may not be null");
+
+        org.apache.log4j.Logger foundLogger = null;
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+            foundLogger = rootLogger();
+        } else {
+            Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+            // search within existing loggers for the given name.
+            // using LogManger.getLogger() will create a logger if it doesn't 
exist
+            // (potential leak since these don't get cleaned up).
+            while (en.hasMoreElements()) {
+                org.apache.log4j.Logger l = en.nextElement();
+                if (logger.equals(l.getName())) {
+                    foundLogger = l;
+                    break;
+                }
+            }
+        }
+
+        if (foundLogger == null) {
+            log.warn("Unable to find level for logger {}", logger);
+            return null;
+        }
+
+        return loggerLevel(foundLogger);
+    }
+
+    /**
+     * Retrieve the current levels of all known loggers
+     * @return the levels of all known loggers; may be empty, but never null
+     */
+    public synchronized Map<String, LoggerLevel> allLevels() {
+        Map<String, LoggerLevel> result = new TreeMap<>();
+
+        Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers();
+        Collections.list(enumeration)
+                .stream()
+                .filter(logger -> logger.getLevel() != null)
+                .forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+        org.apache.log4j.Logger root = rootLogger();
+        if (root.getLevel() != null) {
+            result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the level for the specified logger and all of its children
+     * @param namespace the name of the logger to adjust along with its 
children; may not be nul
+     * @param level the level to set for the logger and its children; may be 
null

Review Comment:
   Good call. I was initially hoping to leave in support for nullifying levels 
in case we wanted to have some kind of reset API but after experimenting with a 
new unit test in the `LoggersTest` class it's been more trouble than it's 
worth. Updated to disallow null levels.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -271,6 +271,14 @@ public static String RESTART_KEY(String connectorName) {
             .field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA)
             .build();
 
+    public static final String LOGGER_CLUSTER_PREFIX = "logger-cluster-";
+    public static String LOGGER_CLUSTER_KEY(String namespace) {

Review Comment:
   Yeah, I figured it was better to follow convention here but agree that it's 
very strange. Probably better for a follow-up PR (this one is already quite 
large).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.

Review Comment:
   It doesn't seem like we follow this convention in the code base (see the 
[ConnectMetrics](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java),
 
[KafkaConfigBackingStore](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java),
 and 
[AbstractConnectCli](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java)
 classes for a few examples).
   
   Do you find it personally easier to read with this style? I don't think we 
need to adhere to the recommendations of Oracle on writing Javadocs if there 
isn't tangible benefit to doing so, but if it does make a difference to you or 
someone else, I'm happy to tweak the Javadocs on this and all other 
newly-introduced classes in this PR..



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * <p>
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+    private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+    /**
+     * Log4j uses "root" (case-insensitive) as name of the root logger.
+     */
+    private static final String ROOT_LOGGER_NAME = "root";
+
+    private final Time time;
+    private final Map<String, Long> lastModifiedTimes;
+
+    public Loggers(Time time) {
+        this.time = time;
+        this.lastModifiedTimes = new HashMap<>();
+    }
+
+    /**
+     * Retrieve the current level for a single logger.
+     * @param logger the name of the logger to retrieve the level for; may not 
be null
+     * @return the current level (falling back on the effective level if 
necessary) of the logger,
+     * or null if no logger with the specified name exists
+     */
+    public synchronized LoggerLevel level(String logger) {
+        Objects.requireNonNull(logger, "Logger may not be null");
+
+        org.apache.log4j.Logger foundLogger = null;
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+            foundLogger = rootLogger();
+        } else {
+            Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+            // search within existing loggers for the given name.
+            // using LogManger.getLogger() will create a logger if it doesn't 
exist
+            // (potential leak since these don't get cleaned up).
+            while (en.hasMoreElements()) {
+                org.apache.log4j.Logger l = en.nextElement();
+                if (logger.equals(l.getName())) {
+                    foundLogger = l;
+                    break;
+                }
+            }
+        }
+
+        if (foundLogger == null) {
+            log.warn("Unable to find level for logger {}", logger);
+            return null;
+        }
+
+        return loggerLevel(foundLogger);
+    }
+
+    /**
+     * Retrieve the current levels of all known loggers
+     * @return the levels of all known loggers; may be empty, but never null
+     */
+    public synchronized Map<String, LoggerLevel> allLevels() {
+        Map<String, LoggerLevel> result = new TreeMap<>();
+
+        Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers();
+        Collections.list(enumeration)
+                .stream()
+                .filter(logger -> logger.getLevel() != null)
+                .forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+        org.apache.log4j.Logger root = rootLogger();
+        if (root.getLevel() != null) {
+            result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+        }
+
+        return result;
+    }
+
+    /**
+     * Set the level for the specified logger and all of its children
+     * @param namespace the name of the logger to adjust along with its 
children; may not be nul
+     * @param level the level to set for the logger and its children; may be 
null
+     * @return all loggers that were affected by this action; may be empty, 
but never null
+     */
+    public synchronized List<String> setLevel(String namespace, Level level) {
+        Objects.requireNonNull(namespace, "Logging namespace may not be null");
+
+        log.info("Setting level of namespace {} and children to {}", 
namespace, level);
+        List<org.apache.log4j.Logger> childLoggers = loggers(namespace);
+
+        List<String> result = new ArrayList<>();
+        for (org.apache.log4j.Logger logger: childLoggers) {
+            setLevel(logger, level);
+            result.add(logger.getName());
+        }
+        Collections.sort(result);
+
+        return result;
+    }
+
+    /**
+     * Retrieve all known loggers within a given namespace, creating an 
ancestor logger for that
+     * namespace if one does not already exist
+     * @param namespace the namespace that the loggers should fall under; may 
not be null
+     * @return all loggers that fall under the given namespace; never null, 
and will always contain
+     * at least one logger (the ancestor logger for the namespace)
+     */
+    private synchronized List<org.apache.log4j.Logger> loggers(String 
namespace) {
+        Objects.requireNonNull(namespace, "Logging namespace may not be null");
+
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namespace)) {
+            List<org.apache.log4j.Logger> result = 
Collections.list(currentLoggers());
+            result.add(rootLogger());
+            return result;
+        }
+
+        List<org.apache.log4j.Logger> result = new ArrayList<>();
+        org.apache.log4j.Logger ancestorLogger = lookupLogger(namespace);
+        Enumeration<org.apache.log4j.Logger> en = currentLoggers();
+        boolean present = false;
+        while (en.hasMoreElements()) {
+            org.apache.log4j.Logger current = en.nextElement();
+            if (current.getName().startsWith(namespace)) {
+                result.add(current);
+            }
+            if (namespace.equals(current.getName())) {
+                present = true;
+            }
+        }
+
+        if (!present) {
+            result.add(ancestorLogger);
+        }

Review Comment:
   I'm a little hesitant to introduce the use of, e.g., a `HashSet` because it 
doesn't appear that the `Logger` class overrides `equals` and `hashCode`.
   
   We could eliminate `present` and do something like this in its place:
   
   ```java
   if (result.stream().noneMatch(logger -> namespace.equals(logger.getName()))) 
{
       result.add(ancestorLogger);
   }
   ```
   
   but ultimately, I don't think that's much easier to read, and I'm hesitant 
to tweak anything here that may end up causing a regression in the existing 
worker-scoped API. Thoughts?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+public class LoggerLevel {
+
+    private final String level;
+    private final Long lastModified;
+
+    public LoggerLevel(
+            @JsonProperty("level") String level,
+            @JsonProperty("last_modified") Long lastModified
+    ) {
+        this.level = Objects.requireNonNull(level, "level may not be null");
+        this.lastModified = lastModified;
+    }
+
+    @JsonProperty
+    public String level() {
+        return level;
+    }
+
+    @JsonProperty("last_modified")
+    public Long lastModified() {
+        return lastModified;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        LoggerLevel that = (LoggerLevel) o;
+        return level.equals(that.level) && Objects.equals(lastModified, 
that.lastModified);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(level, lastModified);
+    }
+
+    @Override
+    public String toString() {
+        return "LoggerLevel{"
+                + "level='" + level + '\''

Review Comment:
   Ask my IDE, this is all auto-generated 🤷



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -122,6 +122,13 @@ public interface ConfigBackingStore {
     default void claimWritePrivileges() {
     }
 
+    /**
+     * Store a new level for the specified logging namespace (and all of its 
children)

Review Comment:
   Good call, done 👍 



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -122,6 +122,13 @@ public interface ConfigBackingStore {
     default void claimWritePrivileges() {
     }
 
+    /**
+     * Store a new level for the specified logging namespace (and all of its 
children)

Review Comment:
   Good call, done 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to