Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360796008 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. Review Comment: It doesn't seem like we follow this convention in the code base (see the [ConnectMetrics](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java), [KafkaConfigBackingStore](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java), and [AbstractConnectCli](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java) classes for a few examples). Do you find it personally easier to read with this style? I don't think we need to adhere to the recommendations of Oracle on writing Javadocs if there isn't tangible benefit to doing so, but if it does make a difference to you or someone else, I'm happy to tweak the Javadocs on this and all other newly-introduced classes in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante merged PR #14538: URL: https://github.com/apache/kafka/pull/14538 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on PR #14538: URL: https://github.com/apache/kafka/pull/14538#issuecomment-177221 Thanks for the reviews, all! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1364507968 ## tests/kafkatest/tests/connect/connect_distributed_test.py: ## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") +@cluster(num_nodes=5) +def test_dynamic_logging(self): +""" +Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. +""" + +self.setup_services(num_workers=3) +self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) +self.cc.start() + +worker = self.cc.nodes[0] +prior_all_loggers = self.cc.get_all_loggers(worker) +self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) +assert prior_all_loggers is not None +assert 'root' in prior_all_loggers +# We need root and at least one other namespace (the other namespace is checked +# later on to make sure that it hasn't changed) +assert len(prior_all_loggers) >= 2 +for logger in prior_all_loggers.values(): +assert logger['last_modified'] is None + +namespace = None +for logger in prior_all_loggers.keys(): +if logger != 'root': +namespace = logger +break +assert namespace is not None + +initial_level = self.cc.get_logger(worker, namespace)['level'].upper() +# Make sure we pick a different one than what's already set for that namespace +new_level = 'INFO' if initial_level != 'INFO' else 'WARN' +self.cc.set_logger(worker, namespace, 'ERROR') +request_time = int(time.time() * 1000) +affected_loggers = self.cc.set_logger(worker, namespace, new_level) +assert len(affected_loggers) >= 1 +for logger in affected_loggers: +assert logger.startswith(namespace) + +assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) +assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + +# Force all loggers to get updated by setting the root namespace to +# two different levels +# This guarantees that their last-modified times will be updated +resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') +assert resp is None + +new_root = 'INFO' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_root, request_time), +# This should be super quick--just a write+read of the config topic, which workers are constantly polling +timeout_sec=10, +err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." +) + +new_level = 'DEBUG' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_level, request_time, namespace), +timeout_sec=10, +err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' +) + +prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None + +prior_namespace = namespace +new_namespace = None +for logger, level in prior_all_loggers[0].items(): +if logger != 'root' and not logger.startswith(namespace): Review Comment: This is true for the modifications we made to `namespace`, but not the ones we made to the root logger (I should have clarified this in my initial response, sorry). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1364507968 ## tests/kafkatest/tests/connect/connect_distributed_test.py: ## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") +@cluster(num_nodes=5) +def test_dynamic_logging(self): +""" +Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. +""" + +self.setup_services(num_workers=3) +self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) +self.cc.start() + +worker = self.cc.nodes[0] +prior_all_loggers = self.cc.get_all_loggers(worker) +self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) +assert prior_all_loggers is not None +assert 'root' in prior_all_loggers +# We need root and at least one other namespace (the other namespace is checked +# later on to make sure that it hasn't changed) +assert len(prior_all_loggers) >= 2 +for logger in prior_all_loggers.values(): +assert logger['last_modified'] is None + +namespace = None +for logger in prior_all_loggers.keys(): +if logger != 'root': +namespace = logger +break +assert namespace is not None + +initial_level = self.cc.get_logger(worker, namespace)['level'].upper() +# Make sure we pick a different one than what's already set for that namespace +new_level = 'INFO' if initial_level != 'INFO' else 'WARN' +self.cc.set_logger(worker, namespace, 'ERROR') +request_time = int(time.time() * 1000) +affected_loggers = self.cc.set_logger(worker, namespace, new_level) +assert len(affected_loggers) >= 1 +for logger in affected_loggers: +assert logger.startswith(namespace) + +assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) +assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + +# Force all loggers to get updated by setting the root namespace to +# two different levels +# This guarantees that their last-modified times will be updated +resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') +assert resp is None + +new_root = 'INFO' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_root, request_time), +# This should be super quick--just a write+read of the config topic, which workers are constantly polling +timeout_sec=10, +err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." +) + +new_level = 'DEBUG' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_level, request_time, namespace), +timeout_sec=10, +err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' +) + +prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None + +prior_namespace = namespace +new_namespace = None +for logger, level in prior_all_loggers[0].items(): +if logger != 'root' and not logger.startswith(namespace): Review Comment: This is true for the modifications we made to `namespace`, but not the ones we made to the root logger. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
gharris1727 commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1364474243 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying Review Comment: ```suggestion * Manages logging levels on a single worker. Supports dynamic adjustment and querying ``` ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java: ## @@ -16,603 +16,14 @@ */ package org.apache.kafka.connect.util.clusters; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.connect.runtime.AbstractStatus; -import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; -import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; -import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.core.Response; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; -import java.util.stream.Collectors; - -import static org.apache.kafka.test.TestUtils.waitForCondition; - /** - * A set of common assertions that can be applied to a Connect cluster during integration testing + * @deprecated Use {@link ConnectAssertions} instead. */ -public class EmbeddedConnectClusterAssertions { - -private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class); -public static final long WORKER_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(5); -public static final long VALIDATION_DURATION_MS = TimeUnit.SECONDS.toMillis(30); -public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.MINUTES.toMillis(2); -// Creating a connector requires two rounds of rebalance; destroying one only requires one -// Assume it'll take ~half the time to destroy a connector as it does to create one -public static final long CONNECTOR_SHUTDOWN_DURATION_MS = TimeUnit.MINUTES.toMillis(1); -private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = TimeUnit.SECONDS.toMillis(60); - -private final EmbeddedConnectCluster connect; - -EmbeddedConnectClusterAssertions(EmbeddedConnectCluster connect) { -this.connect = connect; -} - -/** - * Assert that at least the requested number of workers are up and running. - * - * @param numWorkers the number of online workers - */ -public void assertAtLeastNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException { -try { -waitForCondition( -() -> checkWorkersUp(numWorkers, (actual, expected) -> actual >= expected).orElse(false), -WORKER_SETUP_DURATION_MS, -"Didn't meet the minimum requested number of online workers: " + numWorkers); -} catch (AssertionError e) { -throw new AssertionError(detailMessage, e); -} -} - -/** - * Assert that at least the requested number of workers are up and running. - * - * @param numWorkers the number of online workers - */ -public void assertExactlyNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException { -try { -waitForCondition( -() -> ch
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
gharris1727 commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1364457214 ## tests/kafkatest/tests/connect/connect_distributed_test.py: ## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") +@cluster(num_nodes=5) +def test_dynamic_logging(self): +""" +Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. +""" + +self.setup_services(num_workers=3) +self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) +self.cc.start() + +worker = self.cc.nodes[0] +prior_all_loggers = self.cc.get_all_loggers(worker) +self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) +assert prior_all_loggers is not None +assert 'root' in prior_all_loggers +# We need root and at least one other namespace (the other namespace is checked +# later on to make sure that it hasn't changed) +assert len(prior_all_loggers) >= 2 +for logger in prior_all_loggers.values(): +assert logger['last_modified'] is None + +namespace = None +for logger in prior_all_loggers.keys(): +if logger != 'root': +namespace = logger +break +assert namespace is not None + +initial_level = self.cc.get_logger(worker, namespace)['level'].upper() +# Make sure we pick a different one than what's already set for that namespace +new_level = 'INFO' if initial_level != 'INFO' else 'WARN' +self.cc.set_logger(worker, namespace, 'ERROR') +request_time = int(time.time() * 1000) +affected_loggers = self.cc.set_logger(worker, namespace, new_level) +assert len(affected_loggers) >= 1 +for logger in affected_loggers: +assert logger.startswith(namespace) + +assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) +assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + +# Force all loggers to get updated by setting the root namespace to +# two different levels +# This guarantees that their last-modified times will be updated +resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') +assert resp is None + +new_root = 'INFO' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_root, request_time), +# This should be super quick--just a write+read of the config topic, which workers are constantly polling +timeout_sec=10, +err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." +) + +new_level = 'DEBUG' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_level, request_time, namespace), +timeout_sec=10, +err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' +) + +prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None + +prior_namespace = namespace +new_namespace = None +for logger, level in prior_all_loggers[0].items(): +if logger != 'root' and not logger.startswith(namespace): Review Comment: > altering a logging namespace will also cause child namespaces to be modified and begin to appear in the content of the GET /admin/loggers endpoint. Don't those get filtered out by the `startswith(namespace)` check? `new_namespace` can't be a child of the `namespace` which was picked initially. There's an assertion that a `new_namespace` is found later, so this isn't a problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527136 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -72,115 +41,42 @@ import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; /** - * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp - * directories and clean up them on them. Methods on the same {@code EmbeddedConnectCluster} are + * Start an embedded connect cluster. Internally, this class will spin up a Kafka and Zk cluster, set up any tmp + * directories, and clean them up on exit. Methods on the same {@code EmbeddedConnectCluster} are * not guaranteed to be thread-safe. */ -public class EmbeddedConnectCluster { +public class EmbeddedConnectCluster extends EmbeddedConnect { private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class); -public static final int DEFAULT_NUM_BROKERS = 1; public static final int DEFAULT_NUM_WORKERS = 1; -private static final Properties DEFAULT_BROKER_CONFIG = new Properties(); private static final String REST_HOST_NAME = "localhost"; private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-"; private final Set connectCluster; -private final EmbeddedKafkaCluster kafkaCluster; -private final HttpClient httpClient; private final Map workerProps; private final String connectClusterName; -private final int numBrokers; private final int numInitialWorkers; -private final boolean maskExitProcedures; private final String workerNamePrefix; private final AtomicInteger nextWorkerId = new AtomicInteger(0); -private final EmbeddedConnectClusterAssertions assertions; -// we should keep the original class loader and set it back after connector stopped since the connector will change the class loader, -// and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed -private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); -private EmbeddedConnectCluster(String name, Map workerProps, int numWorkers, - int numBrokers, Properties brokerProps, - boolean maskExitProcedures, - Map additionalKafkaClusterClientConfigs) { +private EmbeddedConnectCluster( +int numBrokers, +Properties brokerProps, +boolean maskExitProcedures, +Map clientProps, +Map workerProps, +String name, +int numWorkers +) { +super(numBrokers, brokerProps, maskExitProcedures, clientProps); this.workerProps = workerProps; this.connectClusterName = name; -this.numBrokers = numBrokers; -this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, additionalKafkaClusterClientConfigs); this.connectCluster = new LinkedHashSet<>(); -this.httpClient = new HttpClient(); this.numInitialWorkers = numWorkers; -this.maskExitProcedures = maskExitProcedures; // leaving non-configurable for now this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX; -this.assertions = new EmbeddedConnectClusterAssertions(this); -} - -/** - * A more graceful way to handle abnormal exit of services in integration tests. - */ -public Exit.Procedure exitProcedure = (code, message) -> { -if (code != 0) { -String exitMessage = "Abrupt service exit with code " + code + " and message " + message; -log.warn(exitMessage); -throw new UngracefulShutdownException(exitMessage); -} -}; - -/** - * A more graceful way to handle abnormal halt of services in integration tests. - */ -public Exit.Procedure haltProcedure = (code, message) -> { -if (code != 0) { -String haltMessage = "Abrupt service halt with code " + code + " and message " + message; -log.warn(haltMessage); -throw new UngracefulShutdownException(haltMessage); -} -}; - -/** - * Start the connect cluster and the embedded Kafka and Zookeeper cluster. - */ -public void start() { -if (maskExitProcedures) { -Exit.setExitProcedure(exitProcedure); -Exit.setHaltProcedure(haltProcedure); -} -kafkaCluster.start(); -startConnect(); -try { -httpClient.start(); -} catch (Exception e) { -throw new ConnectException("Failed to start HTTP client", e); -} -} - -/** - * Stop the connect cluster and the embedded Kafka and Zookeeper cluster. - * Clean up any temp directories creat
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362530005 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -332,722 +186,46 @@ public Set workers() { return new LinkedHashSet<>(connectCluster); } -/** - * Configure a connector. If the connector does not already exist, a new one will be created and - * the given configuration will be applied to it. - * - * @param connName the name of the connector - * @param connConfig the intended configuration - * @throws ConnectRestException if the REST api returns error status - * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent - */ -public String configureConnector(String connName, Map connConfig) { -String url = endpointForResource(String.format("connectors/%s/config", connName)); -return putConnectorConfig(url, connConfig); -} - -/** - * Validate a given connector configuration. If the configuration validates or - * has a configuration error, an instance of {@link ConfigInfos} is returned. If the validation fails - * an exception is thrown. - * - * @param connClassName the name of the connector class - * @param connConfigthe intended configuration - * @throws ConnectRestException if the REST api returns error status - * @throws ConnectException if the configuration fails to serialize/deserialize or if the request failed to send - */ -public ConfigInfos validateConnectorConfig(String connClassName, Map connConfig) { -String url = endpointForResource(String.format("connector-plugins/%s/config/validate", connClassName)); -String response = putConnectorConfig(url, connConfig); -ConfigInfos configInfos; -try { -configInfos = new ObjectMapper().readValue(response, ConfigInfos.class); -} catch (IOException e) { -throw new ConnectException("Unable deserialize response into a ConfigInfos object"); -} -return configInfos; -} - -/** - * Execute a PUT request with the given connector configuration on the given URL endpoint. - * - * @param urlthe full URL of the endpoint that corresponds to the given REST resource - * @param connConfig the intended configuration - * @throws ConnectRestException if the REST api returns error status - * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent - */ -protected String putConnectorConfig(String url, Map connConfig) { -ObjectMapper mapper = new ObjectMapper(); -String content; -try { -content = mapper.writeValueAsString(connConfig); -} catch (IOException e) { -throw new ConnectException("Could not serialize connector configuration and execute PUT request"); -} -Response response = requestPut(url, content); -if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { -return responseToString(response); -} -throw new ConnectRestException(response.getStatus(), -"Could not execute PUT request. Error response: " + responseToString(response)); -} - -/** - * Delete an existing connector. - * - * @param connName name of the connector to be deleted - * @throws ConnectRestException if the REST API returns error status - * @throws ConnectException for any other error. - */ -public void deleteConnector(String connName) { -String url = endpointForResource(String.format("connectors/%s", connName)); -Response response = requestDelete(url); -if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { -throw new ConnectRestException(response.getStatus(), -"Could not execute DELETE request. Error response: " + responseToString(response)); -} -} - -/** - * Stop an existing connector. - * - * @param connName name of the connector to be paused - * @throws ConnectRestException if the REST API returns error status - * @throws ConnectException for any other error. - */ -public void stopConnector(String connName) { -String url = endpointForResource(String.format("connectors/%s/stop", connName)); -Response response = requestPut(url, ""); -if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { -throw new ConnectRestException(response.getStatus(), -"Could not execute PUT request. Error response: " + responseToString(response)); -} -} - -/** - * Pause an existing connector. - * - * @param connName name of the connector to be paused - * @throws ConnectRestException if th
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362528851 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -302,27 +177,6 @@ public String getName() { return connectClusterName; } -/** - * Get the workers that are up and running. - * - * @return the list of handles of the online workers - */ -public Set activeWorkers() { -ObjectMapper mapper = new ObjectMapper(); -return connectCluster.stream() -.filter(w -> { -try { -mapper.readerFor(ServerInfo.class) - .readValue(responseToString(requestGet(w.url().toString(; -return true; -} catch (ConnectException | IOException e) { -// Worker failed to respond. Consider it's offline -return false; -} -}) -.collect(Collectors.toSet()); -} Review Comment: This is also migrated directly to the `EmbeddedConnect` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527738 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -225,28 +121,6 @@ public void removeWorker(WorkerHandle worker) { connectCluster.remove(worker); } -private void stopWorker(WorkerHandle worker) { -try { -log.info("Stopping worker {}", worker); -worker.stop(); -} catch (UngracefulShutdownException e) { -log.warn("Worker {} did not shutdown gracefully", worker); -} catch (Exception e) { -log.error("Could not stop connect", e); -throw new RuntimeException("Could not stop worker", e); -} -} - -/** - * Set a new timeout for REST requests to each worker in the cluster. Useful if a request - * is expected to block, since the time spent awaiting that request can be reduced - * and test runtime bloat can be avoided. - * @param requestTimeoutMs the new timeout in milliseconds; must be positive - */ -public void requestTimeout(long requestTimeoutMs) { -connectCluster.forEach(worker -> worker.requestTimeout(requestTimeoutMs)); -} Review Comment: This is also migrated directly to the `EmbeddedConnect` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527136 ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ## @@ -72,115 +41,42 @@ import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; /** - * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp - * directories and clean up them on them. Methods on the same {@code EmbeddedConnectCluster} are + * Start an embedded connect cluster. Internally, this class will spin up a Kafka and Zk cluster, set up any tmp + * directories, and clean them up on exit. Methods on the same {@code EmbeddedConnectCluster} are * not guaranteed to be thread-safe. */ -public class EmbeddedConnectCluster { +public class EmbeddedConnectCluster extends EmbeddedConnect { private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class); -public static final int DEFAULT_NUM_BROKERS = 1; public static final int DEFAULT_NUM_WORKERS = 1; -private static final Properties DEFAULT_BROKER_CONFIG = new Properties(); private static final String REST_HOST_NAME = "localhost"; private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-"; private final Set connectCluster; -private final EmbeddedKafkaCluster kafkaCluster; -private final HttpClient httpClient; private final Map workerProps; private final String connectClusterName; -private final int numBrokers; private final int numInitialWorkers; -private final boolean maskExitProcedures; private final String workerNamePrefix; private final AtomicInteger nextWorkerId = new AtomicInteger(0); -private final EmbeddedConnectClusterAssertions assertions; -// we should keep the original class loader and set it back after connector stopped since the connector will change the class loader, -// and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed -private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); -private EmbeddedConnectCluster(String name, Map workerProps, int numWorkers, - int numBrokers, Properties brokerProps, - boolean maskExitProcedures, - Map additionalKafkaClusterClientConfigs) { +private EmbeddedConnectCluster( +int numBrokers, +Properties brokerProps, +boolean maskExitProcedures, +Map clientProps, +Map workerProps, +String name, +int numWorkers +) { +super(numBrokers, brokerProps, maskExitProcedures, clientProps); this.workerProps = workerProps; this.connectClusterName = name; -this.numBrokers = numBrokers; -this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, additionalKafkaClusterClientConfigs); this.connectCluster = new LinkedHashSet<>(); -this.httpClient = new HttpClient(); this.numInitialWorkers = numWorkers; -this.maskExitProcedures = maskExitProcedures; // leaving non-configurable for now this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX; -this.assertions = new EmbeddedConnectClusterAssertions(this); -} - -/** - * A more graceful way to handle abnormal exit of services in integration tests. - */ -public Exit.Procedure exitProcedure = (code, message) -> { -if (code != 0) { -String exitMessage = "Abrupt service exit with code " + code + " and message " + message; -log.warn(exitMessage); -throw new UngracefulShutdownException(exitMessage); -} -}; - -/** - * A more graceful way to handle abnormal halt of services in integration tests. - */ -public Exit.Procedure haltProcedure = (code, message) -> { -if (code != 0) { -String haltMessage = "Abrupt service halt with code " + code + " and message " + message; -log.warn(haltMessage); -throw new UngracefulShutdownException(haltMessage); -} -}; - -/** - * Start the connect cluster and the embedded Kafka and Zookeeper cluster. - */ -public void start() { -if (maskExitProcedures) { -Exit.setExitProcedure(exitProcedure); -Exit.setHaltProcedure(haltProcedure); -} -kafkaCluster.start(); -startConnect(); -try { -httpClient.start(); -} catch (Exception e) { -throw new ConnectException("Failed to start HTTP client", e); -} -} - -/** - * Stop the connect cluster and the embedded Kafka and Zookeeper cluster. - * Clean up any temp directories creat
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362238029 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerTest.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@Category(IntegrationTest.class) +public class StandaloneWorkerTest { Review Comment: Ah, good catch! Done. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util.clusters; + +import org.apache.kafka.connect.cli.ConnectStandalone; +import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG; +import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; +import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG; + +/** + * Start a standalone embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, + * setup any tmp directories and clean up them on them. Methods on the same Review Comment: Shamelessly copied from the existing [EmbeddedConnectCluster Javadocs](https://github.com/apache/kafka/blob/e7e399b9409b42f82d7ce57b99a461c465e5849d/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java#L74-L78). I'll tweak the language to match your suggestion, which I like. ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectBuilder.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + *
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
yashmayya commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362109618 ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerTest.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.integration; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone; +import org.apache.kafka.test.IntegrationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +@Category(IntegrationTest.class) +public class StandaloneWorkerTest { Review Comment: The existing integration test classes use the suffix `IntegrationTest` in their class names to distinguish them more easily from unit test classes that only use the `Test` suffix; any particular reason for not doing so here? ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.util.clusters; + +import org.apache.kafka.connect.cli.ConnectStandalone; +import org.apache.kafka.connect.runtime.Connect; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG; +import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; +import static org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG; + +/** + * Start a standalone embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, + * setup any tmp directories and clean up them on them. Methods on the same Review Comment: > clean up them on them Was this supposed to be something like `clean them up on exit`? ## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectBuilder.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360904810 ## tests/kafkatest/tests/connect/connect_distributed_test.py: ## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") +@cluster(num_nodes=5) +def test_dynamic_logging(self): +""" +Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. +""" + +self.setup_services(num_workers=3) +self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) +self.cc.start() + +worker = self.cc.nodes[0] +prior_all_loggers = self.cc.get_all_loggers(worker) +self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) +assert prior_all_loggers is not None +assert 'root' in prior_all_loggers +# We need root and at least one other namespace (the other namespace is checked +# later on to make sure that it hasn't changed) +assert len(prior_all_loggers) >= 2 +for logger in prior_all_loggers.values(): +assert logger['last_modified'] is None + +namespace = None +for logger in prior_all_loggers.keys(): +if logger != 'root': +namespace = logger +break +assert namespace is not None + +initial_level = self.cc.get_logger(worker, namespace)['level'].upper() +# Make sure we pick a different one than what's already set for that namespace +new_level = 'INFO' if initial_level != 'INFO' else 'WARN' +self.cc.set_logger(worker, namespace, 'ERROR') +request_time = int(time.time() * 1000) +affected_loggers = self.cc.set_logger(worker, namespace, new_level) +assert len(affected_loggers) >= 1 +for logger in affected_loggers: +assert logger.startswith(namespace) + +assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) +assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + +# Force all loggers to get updated by setting the root namespace to +# two different levels +# This guarantees that their last-modified times will be updated +resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') +assert resp is None + +new_root = 'INFO' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_root, request_time), +# This should be super quick--just a write+read of the config topic, which workers are constantly polling +timeout_sec=10, +err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." +) + +new_level = 'DEBUG' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_level, request_time, namespace), +timeout_sec=10, +err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' +) + +prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None + +prior_namespace = namespace +new_namespace = None +for logger, level in prior_all_loggers[0].items(): +if logger != 'root' and not logger.startswith(namespace): Review Comment: That assertion (`num_loggers >= 3`) does not necessarily have to be true for this condition to be met, since altering a logging namespace will also cause child namespaces to be modified and begin to appear in the content of the `GET /admin/loggers` endpoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
yashmayya commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362100106 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java: ## @@ -164,6 +171,13 @@ interface UpdateListener { * @param restartRequest the {@link RestartRequest restart request} */ void onRestartRequest(RestartRequest restartRequest); + +/** + * Invoked when a dynamic log level adjustment has been read Review Comment: Sounds good ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ +public synchronized LoggerLevel level(String logger) { +Objects.requireNonNull(logger, "Logger may not be null"); + +org.apache.log4j.Logger foundLogger = null; +if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) { +foundLogger = rootLogger(); +} else { +Enumeration en = currentLoggers(); +// search within existing loggers for the given name. +// using LogManger.getLogger() will create a logger if it doesn't exist +// (potential leak since these don't get cleaned up). +while (en.hasMoreElements()) { +org.apache.log4j.Logger l = en.nextElement(); +if (logger.equals(l.getName())) { +foundLogger = l; +break; +} +} +} + +if (foundLogger == null) { +log.warn("Unable to find level for logger {}", logger); +return null; +} + +return loggerLevel(foundLogger); +} + +/** + * Retrieve the current levels of all known loggers + * @return the levels of all known loggers; may be empty, but never null + */ +public synchronized Map allLevels() { +Map result = new TreeMap<>(); + +Enumeration enumeration = currentLoggers(); +Collections.list(enumeration) +.stream() +.filter(logger -> logger.getLevel() != null) +.forEach(logger -> result.put(logger.getName(), loggerLevel(logger))); + +org.apache.log4j.Logger root = rootLogger(); +if (root.getLevel() != null) { +result.put(ROOT_LOGGER_NAME, loggerLevel(root)); +} + +return result; +} + +/** + * Set the level for the specified logger and all of its children + * @param namespace the name of the logger to adjust along with its children; may not be nul + * @param level the level to set for the logger and its
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on PR #14538: URL: https://github.com/apache/kafka/pull/14538#issuecomment-1764882825 Ah, never mind--I realize that some static fields for timeouts from the `EmbeddedConnectClusterAssertions` class may be used in downstream projects. I've pushed a new commit with the suggested deprecated shim. Thanks again for catching this, Greg! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360788058 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ +public synchronized LoggerLevel level(String logger) { Review Comment: Yeah, the synchronization here is fairly coarse-grained but I don't think it's likely to be a bottleneck (who's going to be issuing hundreds of dynamic log requests a second?). It's also necessary for this read-only operation to have some synchronization in order to ensure that log levels and their last-modified times are kept in-sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360768631 ## checkstyle/suppressions.xml: ## @@ -139,6 +139,8 @@ files="Worker(SinkTask|SourceTask|Coordinator).java"/> + Review Comment: Switching on enums still requires a default case; there's a good explanation on [StackOverflow](https://stackoverflow.com/questions/5013194/why-is-default-required-for-a-switch-on-an-enum) for why. ## tests/kafkatest/tests/connect/connect_distributed_test.py: ## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") +@cluster(num_nodes=5) +def test_dynamic_logging(self): +""" +Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. +""" + +self.setup_services(num_workers=3) +self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) +self.cc.start() + +worker = self.cc.nodes[0] +prior_all_loggers = self.cc.get_all_loggers(worker) +self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) +assert prior_all_loggers is not None +assert 'root' in prior_all_loggers +# We need root and at least one other namespace (the other namespace is checked +# later on to make sure that it hasn't changed) +assert len(prior_all_loggers) >= 2 +for logger in prior_all_loggers.values(): +assert logger['last_modified'] is None + +namespace = None +for logger in prior_all_loggers.keys(): +if logger != 'root': +namespace = logger +break +assert namespace is not None + +initial_level = self.cc.get_logger(worker, namespace)['level'].upper() +# Make sure we pick a different one than what's already set for that namespace +new_level = 'INFO' if initial_level != 'INFO' else 'WARN' +self.cc.set_logger(worker, namespace, 'ERROR') +request_time = int(time.time() * 1000) +affected_loggers = self.cc.set_logger(worker, namespace, new_level) +assert len(affected_loggers) >= 1 +for logger in affected_loggers: +assert logger.startswith(namespace) + +assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) +assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + +# Force all loggers to get updated by setting the root namespace to +# two different levels +# This guarantees that their last-modified times will be updated +resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') +assert resp is None + +new_root = 'INFO' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_root, request_time), +# This should be super quick--just a write+read of the config topic, which workers are constantly polling +timeout_sec=10, +err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." +) + +new_level = 'DEBUG' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_level, request_time, namespace), +timeout_sec=10, +err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' +) + +prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None + +prior_namespace = namespace +new_namespace = None +for logger, level in prior_all_loggers[0].items(): +if logger != 'root' and not logger.startswith(namespace): Review Comment: That assertion (`num_loggers = 3`) does not necessarily have to be true for this condition to be met, since altering a logging namespace will also cause child namespaces to be modified and begin to appear in the content of the `GET /admin/loggers` endpoint. ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -297,8 +297,8 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce // When automatic topic creation is disabled on the broker brokerProps.put("auto.cr
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
yashmayya commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360494846 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ +public synchronized LoggerLevel level(String logger) { +Objects.requireNonNull(logger, "Logger may not be null"); + +org.apache.log4j.Logger foundLogger = null; +if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) { +foundLogger = rootLogger(); +} else { +Enumeration en = currentLoggers(); +// search within existing loggers for the given name. +// using LogManger.getLogger() will create a logger if it doesn't exist +// (potential leak since these don't get cleaned up). +while (en.hasMoreElements()) { +org.apache.log4j.Logger l = en.nextElement(); +if (logger.equals(l.getName())) { +foundLogger = l; +break; +} +} +} + +if (foundLogger == null) { +log.warn("Unable to find level for logger {}", logger); +return null; +} + +return loggerLevel(foundLogger); +} + +/** + * Retrieve the current levels of all known loggers + * @return the levels of all known loggers; may be empty, but never null + */ +public synchronized Map allLevels() { +Map result = new TreeMap<>(); + +Enumeration enumeration = currentLoggers(); +Collections.list(enumeration) +.stream() +.filter(logger -> logger.getLevel() != null) +.forEach(logger -> result.put(logger.getName(), loggerLevel(logger))); + +org.apache.log4j.Logger root = rootLogger(); +if (root.getLevel() != null) { +result.put(ROOT_LOGGER_NAME, loggerLevel(root)); +} + +return result; +} + +/** + * Set the level for the specified logger and all of its children + * @param namespace the name of the logger to adjust along with its children; may not be nul Review Comment: ```suggestion * @param namespace the name of the logger to adjust along with its children; may not be null ``` nit ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ## @@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, Callback callback) { * @param cb callback to invoke upon completion */ protected abstract void modifyConnectorOffsets(String connName, Map, M
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
gharris1727 commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1358597074 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ +public synchronized LoggerLevel level(String logger) { Review Comment: @yangy Writes must have an exclusive lock for thread safety, meaning that no concurrent reads are allowed while a write is happening. The synchronized lock here makes sure that no writes are ongoing. It's a bit stronger than necessary: multiple concurrent readers could be allowed, but multiple readers sharing a lock is a little bit more complex, and probably not worth the performance difference. These methods are not called in any hotpath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
yangy commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1357785014 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. + * @param logger the name of the logger to retrieve the level for; may not be null + * @return the current level (falling back on the effective level if necessary) of the logger, + * or null if no logger with the specified name exists + */ +public synchronized LoggerLevel level(String logger) { Review Comment: looks like this method is read only method, what's the reason for putting synchronized lock in here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
yangy commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1357779469 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java: ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +/** + * Manges logging levels on a single worker. Supports dynamic adjustment and querying + * of logging levels. + * + * This class is thread-safe; concurrent calls to all of its public methods from any number + * of threads are permitted. + */ +public class Loggers { + +private static final Logger log = LoggerFactory.getLogger(Loggers.class); + +/** + * Log4j uses "root" (case-insensitive) as name of the root logger. + */ +private static final String ROOT_LOGGER_NAME = "root"; + +private final Time time; +private final Map lastModifiedTimes; + +public Loggers(Time time) { +this.time = time; +this.lastModifiedTimes = new HashMap<>(); +} + +/** + * Retrieve the current level for a single logger. Review Comment: nit: how about use " 3rd person declarative" for java doc? ref: https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html "The description is in 3rd person declarative rather than 2nd person imperative." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
gharris1727 commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1357258143 ## checkstyle/suppressions.xml: ## @@ -139,6 +139,8 @@ files="Worker(SinkTask|SourceTask|Coordinator).java"/> + Review Comment: nit: we could probably avoid this warning with enum parsing and a fallback enum. ## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ## @@ -297,8 +297,8 @@ public void testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce // When automatic topic creation is disabled on the broker brokerProps.put("auto.create.topics.enable", "false"); connect = connectBuilder -.brokerProps(brokerProps) .numWorkers(1) +.brokerProps(brokerProps) Review Comment: nit: is this necessary? ## tests/kafkatest/tests/connect/connect_distributed_test.py: ## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") +@cluster(num_nodes=5) +def test_dynamic_logging(self): +""" +Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. +""" + +self.setup_services(num_workers=3) +self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) +self.cc.start() + +worker = self.cc.nodes[0] +prior_all_loggers = self.cc.get_all_loggers(worker) +self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) +assert prior_all_loggers is not None +assert 'root' in prior_all_loggers +# We need root and at least one other namespace (the other namespace is checked +# later on to make sure that it hasn't changed) +assert len(prior_all_loggers) >= 2 +for logger in prior_all_loggers.values(): +assert logger['last_modified'] is None + +namespace = None +for logger in prior_all_loggers.keys(): +if logger != 'root': +namespace = logger +break +assert namespace is not None + +initial_level = self.cc.get_logger(worker, namespace)['level'].upper() +# Make sure we pick a different one than what's already set for that namespace +new_level = 'INFO' if initial_level != 'INFO' else 'WARN' +self.cc.set_logger(worker, namespace, 'ERROR') +request_time = int(time.time() * 1000) +affected_loggers = self.cc.set_logger(worker, namespace, new_level) +assert len(affected_loggers) >= 1 +for logger in affected_loggers: +assert logger.startswith(namespace) + +assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) +assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + +# Force all loggers to get updated by setting the root namespace to +# two different levels +# This guarantees that their last-modified times will be updated +resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') +assert resp is None + +new_root = 'INFO' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_root, request_time), +# This should be super quick--just a write+read of the config topic, which workers are constantly polling +timeout_sec=10, +err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." +) + +new_level = 'DEBUG' +request_time = int(time.time() * 1000) +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None +wait_until( +lambda: self.loggers_set(new_level, request_time, namespace), +timeout_sec=10, +err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' +) + +prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] +resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') +assert resp is None + +prior_namespace = namespace +new_namespace = None +for logger, level in prior_all_loggers[0].items(): +if logger != 'root' and not logger.startswith(namespace): Review Comment: In order for this to be true, there must be at least two non-overlapping namespaces not including root. With root included, does that mean that the earlier `ass
Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]
C0urante commented on PR #14538: URL: https://github.com/apache/kafka/pull/14538#issuecomment-1760004960 @gharris1727, @yashmayya, @mimaison would any of you be able to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org