C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360768631
########## checkstyle/suppressions.xml: ########## @@ -139,6 +139,8 @@ files="Worker(SinkTask|SourceTask|Coordinator).java"/> <suppress checks="ParameterNumber" files="(ConfigKeyInfo|DistributedHerder).java"/> + <suppress checks="DefaultComesLast" + files="LoggingResource.java" /> Review Comment: Switching on enums still requires a default case; there's a good explanation on [StackOverflow](https://stackoverflow.com/questions/5013194/why-is-default-required-for-a-switch-on-an-enum) for why. ########## tests/kafkatest/tests/connect/connect_distributed_test.py: ########## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") + @cluster(num_nodes=5) + def test_dynamic_logging(self): + """ + Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. + """ + + self.setup_services(num_workers=3) + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + worker = self.cc.nodes[0] + prior_all_loggers = self.cc.get_all_loggers(worker) + self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) + assert prior_all_loggers is not None + assert 'root' in prior_all_loggers + # We need root and at least one other namespace (the other namespace is checked + # later on to make sure that it hasn't changed) + assert len(prior_all_loggers) >= 2 + for logger in prior_all_loggers.values(): + assert logger['last_modified'] is None + + namespace = None + for logger in prior_all_loggers.keys(): + if logger != 'root': + namespace = logger + break + assert namespace is not None + + initial_level = self.cc.get_logger(worker, namespace)['level'].upper() + # Make sure we pick a different one than what's already set for that namespace + new_level = 'INFO' if initial_level != 'INFO' else 'WARN' + self.cc.set_logger(worker, namespace, 'ERROR') + request_time = int(time.time() * 1000) + affected_loggers = self.cc.set_logger(worker, namespace, new_level) + assert len(affected_loggers) >= 1 + for logger in affected_loggers: + assert logger.startswith(namespace) + + assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) + assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + + # Force all loggers to get updated by setting the root namespace to + # two different levels + # This guarantees that their last-modified times will be updated + resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') + assert resp is None + + new_root = 'INFO' + request_time = int(time.time() * 1000) + resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') + assert resp is None + wait_until( + lambda: self.loggers_set(new_root, request_time), + # This should be super quick--just a write+read of the config topic, which workers are constantly polling + timeout_sec=10, + err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." + ) + + new_level = 'DEBUG' + request_time = int(time.time() * 1000) + resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') + assert resp is None + wait_until( + lambda: self.loggers_set(new_level, request_time, namespace), + timeout_sec=10, + err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' + ) + + prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] + resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') + assert resp is None + + prior_namespace = namespace + new_namespace = None + for logger, level in prior_all_loggers[0].items(): + if logger != 'root' and not logger.startswith(namespace): Review Comment: That assertion (`num_loggers = 3`) does not necessarily have to be true for this condition to be met, since altering a logging namespace will also cause child namespaces to be modified and begin to appear in the content of the `GET /admin/loggers` endpoint. ########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -297,8 +297,8 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce // When automatic topic creation is disabled on the broker brokerProps.put("auto.create.topics.enable", "false"); connect = connectBuilder - .brokerProps(brokerProps) .numWorkers(1) + .brokerProps(brokerProps) Review Comment: Whoops! Reverted. Accidentally left in from an earlier iteration of the refactored `Builder` API. ########## tests/kafkatest/tests/connect/connect_distributed_test.py: ########## @@ -81,7 +81,13 @@ def __init__(self, test_context): self.value_converter = "org.apache.kafka.connect.json.JsonConverter" self.schemas = True - def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False): + def setup_services(self, Review Comment: I wanted to be able to explicitly set the number of workers to 3. The current default is 3, but I wanted updates to that default to not affect the new dynamic logging system test. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, Callback<Message> callback) { * @param cb callback to invoke upon completion */ protected abstract void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb); + + @Override + public LoggerLevel loggerLevel(String logger) { + return loggers.level(logger); + } + + @Override + public Map<String, LoggerLevel> allLoggerLevels() { + return loggers.allLevels(); + } + + @Override + public List<String> setWorkerLoggerLevel(String namespace, String desiredLevelStr) { + Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), null); + + if (level == null) { + log.warn("Ignoring request to set invalid level '{}' for namespace {}", desiredLevelStr, namespace); + return Collections.emptyList(); + } Review Comment: Yep, both guesses are accurate 👍 High-level, I figured it was better to put this guard in place since the log level string doesn't necessarily have to originate from a source that went through the REST API of this worker, or even a worker with this version. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java: ########## @@ -164,6 +171,13 @@ interface UpdateListener { * @param restartRequest the {@link RestartRequest restart request} */ void onRestartRequest(RestartRequest restartRequest); + + /** + * Invoked when a dynamic log level adjustment has been read Review Comment: This part I think should stay as-is. If we end up supporting persistent cluster-wide updates, we may use this exact same internal API to communicate them to herders. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, Callback<Message> callback) { * @param cb callback to invoke upon completion */ protected abstract void modifyConnectorOffsets(String connName, Map<Map<String, ?>, Map<String, ?>> offsets, Callback<Message> cb); + + @Override + public LoggerLevel loggerLevel(String logger) { + return loggers.level(logger); + } + + @Override + public Map<String, LoggerLevel> allLoggerLevels() { + return loggers.allLevels(); + } + + @Override + public List<String> setWorkerLoggerLevel(String namespace, String desiredLevelStr) { + Level level = Level.toLevel(desiredLevelStr.toUpperCase(Locale.ROOT), null); + + if (level == null) { + log.warn("Ignoring request to set invalid level '{}' for namespace {}", desiredLevelStr, namespace); + return Collections.emptyList(); + } Review Comment: Yep, both guesses are accurate 👍 High-level, I figured it was better to put this guard in place since the log level string doesn't necessarily have to originate from a source that went through the REST API of this worker, or even a worker with this version. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class LoggerLevel { + + private final String level; + private final Long lastModified; + + public LoggerLevel( + @JsonProperty("level") String level, + @JsonProperty("last_modified") Long lastModified + ) { + this.level = Objects.requireNonNull(level, "level may not be null"); + this.lastModified = lastModified; + } + + @JsonProperty + public String level() { + return level; + } + + @JsonProperty("last_modified") + public Long lastModified() { + return lastModified; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + LoggerLevel that = (LoggerLevel) o; + return level.equals(that.level) && Objects.equals(lastModified, that.lastModified); + } + + @Override + public int hashCode() { + return Objects.hash(level, lastModified); + } + + @Override + public String toString() { + return "LoggerLevel{" + + "level='" + level + '\'' Review Comment: Ask my IDE, this is all auto-generated 🤷 ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * <p> + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + + private static final Logger log = LoggerFactory.getLogger(Loggers.class); + + /** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ + private static final String ROOT_LOGGER_NAME = "root"; + + private final Time time; + private final Map<String, Long> lastModifiedTimes; + + public Loggers(Time time) { + this.time = time; + this.lastModifiedTimes = new HashMap<>(); + } + + /** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ + public synchronized LoggerLevel level(String logger) { + Objects.requireNonNull(logger, "Logger may not be null"); + + org.apache.log4j.Logger foundLogger = null; + if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) { + foundLogger = rootLogger(); + } else { + Enumeration<org.apache.log4j.Logger> en = currentLoggers(); + // search within existing loggers for the given name. + // using LogManger.getLogger() will create a logger if it doesn't exist + // (potential leak since these don't get cleaned up). + while (en.hasMoreElements()) { + org.apache.log4j.Logger l = en.nextElement(); + if (logger.equals(l.getName())) { + foundLogger = l; + break; + } + } + } + + if (foundLogger == null) { + log.warn("Unable to find level for logger {}", logger); + return null; + } + + return loggerLevel(foundLogger); + } + + /** + * Retrieve the current levels of all known loggers + * @return the levels of all known loggers; may be empty, but never null + */ + public synchronized Map<String, LoggerLevel> allLevels() { + Map<String, LoggerLevel> result = new TreeMap<>(); + + Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers(); + Collections.list(enumeration) + .stream() + .filter(logger -> logger.getLevel() != null) + .forEach(logger -> result.put(logger.getName(), loggerLevel(logger))); + + org.apache.log4j.Logger root = rootLogger(); + if (root.getLevel() != null) { + result.put(ROOT_LOGGER_NAME, loggerLevel(root)); + } + + return result; + } + + /** + * Set the level for the specified logger and all of its children + * @param namespace the name of the logger to adjust along with its children; may not be nul Review Comment: Was afraid to apply directly via the UI after rebasing onto trunk locally, but I did apply this in a follow-up commit that I've pushed now. Good catch! ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java: ########## @@ -94,122 +88,54 @@ public Response listLoggers() { public Response getLogger(final @PathParam("logger") String namedLogger) { Objects.requireNonNull(namedLogger, "require non-null name"); - Logger logger = null; - if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) { - logger = rootLogger(); - } else { - Enumeration<Logger> en = currentLoggers(); - // search within existing loggers for the given name. - // using LogManger.getLogger() will create a logger if it doesn't exist - // (potential leak since these don't get cleaned up). - while (en.hasMoreElements()) { - Logger l = en.nextElement(); - if (namedLogger.equals(l.getName())) { - logger = l; - break; - } - } - } - if (logger == null) { + LoggerLevel loggerLevel = herder.loggerLevel(namedLogger); + if (loggerLevel == null) throw new NotFoundException("Logger " + namedLogger + " not found."); - } else { - return Response.ok(effectiveLevelToMap(logger)).build(); - } - } + return Response.ok(loggerLevel).build(); + } /** * Adjust level of a named logger. If the name corresponds to an ancestor, then the log level is applied to all child loggers. * - * @param namedLogger name of the logger + * @param namespace name of the logger * @param levelMap a map that is expected to contain one key 'level', and a value that is one of the log4j levels: * DEBUG, ERROR, FATAL, INFO, TRACE, WARN * @return names of loggers whose levels were modified */ @PUT @Path("/{logger}") @Operation(summary = "Set the log level for the specified logger") - public Response setLevel(final @PathParam("logger") String namedLogger, - final Map<String, String> levelMap) { - String desiredLevelStr = levelMap.get("level"); - if (desiredLevelStr == null) { - throw new BadRequestException("Desired 'level' parameter was not specified in request."); + @SuppressWarnings("fallthrough") + public Response setLevel(final @PathParam("logger") String namespace, + final Map<String, String> levelMap, + @DefaultValue("worker") @QueryParam("scope") @Parameter(description = "The scope for the logging modification (single-worker, cluster-wide, etc.)") String scope) { + if (scope == null) { + log.warn("Received null scope in request to adjust logging level; will default to {}", WORKER_SCOPE); Review Comment: Yep, Yash is right. You can verify this by running the `StandaloneWorkerTest::testDynamicLogging` case, setting a breakpoint after the first call to set a log level without a scope, and then examining the logs when the breakpoint is hit. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * <p> + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + + private static final Logger log = LoggerFactory.getLogger(Loggers.class); + + /** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ + private static final String ROOT_LOGGER_NAME = "root"; + + private final Time time; + private final Map<String, Long> lastModifiedTimes; + + public Loggers(Time time) { + this.time = time; + this.lastModifiedTimes = new HashMap<>(); + } + + /** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ + public synchronized LoggerLevel level(String logger) { Review Comment: Yeah, the synchronization here is fairly coarse-grained but I don't think it's likely to be a bottleneck (who's going to be issuing hundreds of dynamic log requests a second?). It's also necessary for this read-only operation to ensure that log levels and their last-modified times are kept in-sync. ########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util.clusters; + +import org.apache.kafka.connect.cli.ConnectStandalone; +import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG; +import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; +import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG; + +/** + * Start a standalone embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, + * setup any tmp directories and clean up them on them. Methods on the same + * {@code EmbeddedConnectStandalone} are not guaranteed to be thread-safe. + */ +public class EmbeddedConnectStandalone extends EmbeddedConnect { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectStandalone.class); + + private static final String REST_HOST_NAME = "localhost"; + + private final Map<String, String> workerProps; + private final String offsetsFile; + + private WorkerHandle connectWorker; + + private EmbeddedConnectStandalone( + int numBrokers, + Properties brokerProps, + boolean maskExitProcedures, + Map<String, String> clientProps, + Map<String, String> workerProps, + String offsetsFile + ) { + super(numBrokers, brokerProps, maskExitProcedures, clientProps); + this.workerProps = workerProps; + this.offsetsFile = offsetsFile; + } + + @Override + public void startConnect() { + log.info("Starting standalone Connect worker"); + + workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers()); + // use a random available port + workerProps.put(LISTENERS_CONFIG, "HTTP://" + REST_HOST_NAME + ":0"); + + workerProps.putIfAbsent(OFFSET_STORAGE_FILE_FILENAME_CONFIG, offsetsFile); + workerProps.putIfAbsent(KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + workerProps.putIfAbsent(VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + workerProps.putIfAbsent(PLUGIN_DISCOVERY_CONFIG, "hybrid_fail"); + + Connect connect = new ConnectStandalone().startConnect(workerProps); + connectWorker = new WorkerHandle("standalone", connect); + } + + @Override + public String toString() { + return String.format("EmbeddedConnectStandalone(numBrokers= %d, workerProps= %s)", + numBrokers, + workerProps); + } + + @Override + protected Set<WorkerHandle> workers() { + return connectWorker != null + ? Collections.singleton(connectWorker) + : Collections.emptySet(); + } + + public static class Builder extends EmbeddedConnectBuilder<EmbeddedConnectStandalone, Builder> { + + private String offsetsFile = null; + + public Builder offsetsFile(String offsetsFile) { + this.offsetsFile = offsetsFile; + return this; + } + + @Override + protected EmbeddedConnectStandalone build( + int numBrokers, + Properties brokerProps, + boolean maskExitProcedures, + Map<String, String> clientProps, + Map<String, String> workerProps + ) { + if (offsetsFile == null) + offsetsFile = tempOffsetsFile(); + + return new EmbeddedConnectStandalone( + numBrokers, + brokerProps, + maskExitProcedures, + clientProps, + workerProps, + offsetsFile + ); + } + + private String tempOffsetsFile() { + try { + return TestUtils + .tempFile("connect-standalone-offsets", "") Review Comment: Was afraid to apply directly via the UI after rebasing onto trunk locally, but I did apply this in a follow-up commit that I've pushed now. Good catch, thanks! ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * <p> + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + + private static final Logger log = LoggerFactory.getLogger(Loggers.class); + + /** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ + private static final String ROOT_LOGGER_NAME = "root"; + + private final Time time; + private final Map<String, Long> lastModifiedTimes; + + public Loggers(Time time) { + this.time = time; + this.lastModifiedTimes = new HashMap<>(); + } + + /** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ + public synchronized LoggerLevel level(String logger) { + Objects.requireNonNull(logger, "Logger may not be null"); + + org.apache.log4j.Logger foundLogger = null; + if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) { + foundLogger = rootLogger(); + } else { + Enumeration<org.apache.log4j.Logger> en = currentLoggers(); + // search within existing loggers for the given name. + // using LogManger.getLogger() will create a logger if it doesn't exist + // (potential leak since these don't get cleaned up). + while (en.hasMoreElements()) { + org.apache.log4j.Logger l = en.nextElement(); + if (logger.equals(l.getName())) { + foundLogger = l; + break; + } + } + } + + if (foundLogger == null) { + log.warn("Unable to find level for logger {}", logger); + return null; + } + + return loggerLevel(foundLogger); + } + + /** + * Retrieve the current levels of all known loggers + * @return the levels of all known loggers; may be empty, but never null + */ + public synchronized Map<String, LoggerLevel> allLevels() { + Map<String, LoggerLevel> result = new TreeMap<>(); + + Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers(); + Collections.list(enumeration) + .stream() + .filter(logger -> logger.getLevel() != null) + .forEach(logger -> result.put(logger.getName(), loggerLevel(logger))); + + org.apache.log4j.Logger root = rootLogger(); + if (root.getLevel() != null) { + result.put(ROOT_LOGGER_NAME, loggerLevel(root)); + } + + return result; + } + + /** + * Set the level for the specified logger and all of its children + * @param namespace the name of the logger to adjust along with its children; may not be nul + * @param level the level to set for the logger and its children; may be null Review Comment: Good call. I was initially hoping to leave in support for nullifying levels in case we wanted to have some kind of reset API but after experimenting with a new unit test in the `LoggersTest` class it's been more trouble than it's worth. Updated to disallow null levels. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -271,6 +271,14 @@ public static String RESTART_KEY(String connectorName) { .field(ONLY_FAILED_FIELD_NAME, Schema.BOOLEAN_SCHEMA) .build(); + public static final String LOGGER_CLUSTER_PREFIX = "logger-cluster-"; + public static String LOGGER_CLUSTER_KEY(String namespace) { Review Comment: Yeah, I figured it was better to follow convention here but agree that it's very strange. Probably better for a follow-up PR (this one is already quite large). ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * <p> + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + + private static final Logger log = LoggerFactory.getLogger(Loggers.class); + + /** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ + private static final String ROOT_LOGGER_NAME = "root"; + + private final Time time; + private final Map<String, Long> lastModifiedTimes; + + public Loggers(Time time) { + this.time = time; + this.lastModifiedTimes = new HashMap<>(); + } + + /** + * Retrieve the current level for a single logger. Review Comment: It doesn't seem like we follow this convention in the code base (see the [ConnectMetrics](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java), [KafkaConfigBackingStore](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java), and [AbstractConnectCli](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java) classes for a few examples). Do you find it personally easier to read with this style? I don't think we need to adhere to the recommendations of Oracle on writing Javadocs if there isn't tangible benefit to doing so, but if it does make a difference to you or someone else, I'm happy to tweak the Javadocs on this and all other newly-introduced classes in this PR.. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * <p> + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + + private static final Logger log = LoggerFactory.getLogger(Loggers.class); + + /** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ + private static final String ROOT_LOGGER_NAME = "root"; + + private final Time time; + private final Map<String, Long> lastModifiedTimes; + + public Loggers(Time time) { + this.time = time; + this.lastModifiedTimes = new HashMap<>(); + } + + /** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ + public synchronized LoggerLevel level(String logger) { + Objects.requireNonNull(logger, "Logger may not be null"); + + org.apache.log4j.Logger foundLogger = null; + if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) { + foundLogger = rootLogger(); + } else { + Enumeration<org.apache.log4j.Logger> en = currentLoggers(); + // search within existing loggers for the given name. + // using LogManger.getLogger() will create a logger if it doesn't exist + // (potential leak since these don't get cleaned up). + while (en.hasMoreElements()) { + org.apache.log4j.Logger l = en.nextElement(); + if (logger.equals(l.getName())) { + foundLogger = l; + break; + } + } + } + + if (foundLogger == null) { + log.warn("Unable to find level for logger {}", logger); + return null; + } + + return loggerLevel(foundLogger); + } + + /** + * Retrieve the current levels of all known loggers + * @return the levels of all known loggers; may be empty, but never null + */ + public synchronized Map<String, LoggerLevel> allLevels() { + Map<String, LoggerLevel> result = new TreeMap<>(); + + Enumeration<org.apache.log4j.Logger> enumeration = currentLoggers(); + Collections.list(enumeration) + .stream() + .filter(logger -> logger.getLevel() != null) + .forEach(logger -> result.put(logger.getName(), loggerLevel(logger))); + + org.apache.log4j.Logger root = rootLogger(); + if (root.getLevel() != null) { + result.put(ROOT_LOGGER_NAME, loggerLevel(root)); + } + + return result; + } + + /** + * Set the level for the specified logger and all of its children + * @param namespace the name of the logger to adjust along with its children; may not be nul + * @param level the level to set for the logger and its children; may be null + * @return all loggers that were affected by this action; may be empty, but never null + */ + public synchronized List<String> setLevel(String namespace, Level level) { + Objects.requireNonNull(namespace, "Logging namespace may not be null"); + + log.info("Setting level of namespace {} and children to {}", namespace, level); + List<org.apache.log4j.Logger> childLoggers = loggers(namespace); + + List<String> result = new ArrayList<>(); + for (org.apache.log4j.Logger logger: childLoggers) { + setLevel(logger, level); + result.add(logger.getName()); + } + Collections.sort(result); + + return result; + } + + /** + * Retrieve all known loggers within a given namespace, creating an ancestor logger for that + * namespace if one does not already exist + * @param namespace the namespace that the loggers should fall under; may not be null + * @return all loggers that fall under the given namespace; never null, and will always contain + * at least one logger (the ancestor logger for the namespace) + */ + private synchronized List<org.apache.log4j.Logger> loggers(String namespace) { + Objects.requireNonNull(namespace, "Logging namespace may not be null"); + + if (ROOT_LOGGER_NAME.equalsIgnoreCase(namespace)) { + List<org.apache.log4j.Logger> result = Collections.list(currentLoggers()); + result.add(rootLogger()); + return result; + } + + List<org.apache.log4j.Logger> result = new ArrayList<>(); + org.apache.log4j.Logger ancestorLogger = lookupLogger(namespace); + Enumeration<org.apache.log4j.Logger> en = currentLoggers(); + boolean present = false; + while (en.hasMoreElements()) { + org.apache.log4j.Logger current = en.nextElement(); + if (current.getName().startsWith(namespace)) { + result.add(current); + } + if (namespace.equals(current.getName())) { + present = true; + } + } + + if (!present) { + result.add(ancestorLogger); + } Review Comment: I'm a little hesitant to introduce the use of, e.g., a `HashSet` because it doesn't appear that the `Logger` class overrides `equals` and `hashCode`. We could eliminate `present` and do something like this in its place: ```java if (result.stream().noneMatch(logger -> namespace.equals(logger.getName()))) { result.add(ancestorLogger); } ``` but ultimately, I don't think that's much easier to read, and I'm hesitant to tweak anything here that may end up causing a regression in the existing worker-scoped API. Thoughts? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/LoggerLevel.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class LoggerLevel { + + private final String level; + private final Long lastModified; + + public LoggerLevel( + @JsonProperty("level") String level, + @JsonProperty("last_modified") Long lastModified + ) { + this.level = Objects.requireNonNull(level, "level may not be null"); + this.lastModified = lastModified; + } + + @JsonProperty + public String level() { + return level; + } + + @JsonProperty("last_modified") + public Long lastModified() { + return lastModified; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + LoggerLevel that = (LoggerLevel) o; + return level.equals(that.level) && Objects.equals(lastModified, that.lastModified); + } + + @Override + public int hashCode() { + return Objects.hash(level, lastModified); + } + + @Override + public String toString() { + return "LoggerLevel{" + + "level='" + level + '\'' Review Comment: Ask my IDE, this is all auto-generated 🤷 ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java: ########## @@ -122,6 +122,13 @@ public interface ConfigBackingStore { default void claimWritePrivileges() { } + /** + * Store a new level for the specified logging namespace (and all of its children) Review Comment: Good call, done 👍 ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java: ########## @@ -122,6 +122,13 @@ public interface ConfigBackingStore { default void claimWritePrivileges() { } + /** + * Store a new level for the specified logging namespace (and all of its children) Review Comment: Good call, done 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org