Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-30 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360796008


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.

Review Comment:
   It doesn't seem like we follow this convention in the code base (see the 
[ConnectMetrics](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java),
 
[KafkaConfigBackingStore](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java),
 and 
[AbstractConnectCli](https://github.com/apache/kafka/blob/1073d434ec98e64afca1979cd18d3244b133e688/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java)
 classes for a few examples).
   
   Do you find it personally easier to read with this style? I don't think we 
need to adhere to the recommendations of Oracle on writing Javadocs if there 
isn't tangible benefit to doing so, but if it does make a difference to you or 
someone else, I'm happy to tweak the Javadocs on this and all other 
newly-introduced classes in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-20 Thread via GitHub


C0urante merged PR #14538:
URL: https://github.com/apache/kafka/pull/14538


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-20 Thread via GitHub


C0urante commented on PR #14538:
URL: https://github.com/apache/kafka/pull/14538#issuecomment-177221

   Thanks for the reviews, all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-18 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1364507968


##
tests/kafkatest/tests/connect/connect_distributed_test.py:
##
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
 wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
err_msg="Failed to see connector startup in PAUSED 
state")
 
+@cluster(num_nodes=5)
+def test_dynamic_logging(self):
+"""
+Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+"""
+
+self.setup_services(num_workers=3)
+self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+self.cc.start()
+
+worker = self.cc.nodes[0]
+prior_all_loggers = self.cc.get_all_loggers(worker)
+self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+assert prior_all_loggers is not None
+assert 'root' in prior_all_loggers
+# We need root and at least one other namespace (the other namespace 
is checked
+# later on to make sure that it hasn't changed)
+assert len(prior_all_loggers) >= 2
+for logger in prior_all_loggers.values():
+assert logger['last_modified'] is None
+
+namespace = None
+for logger in prior_all_loggers.keys():
+if logger != 'root':
+namespace = logger
+break
+assert namespace is not None
+
+initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+# Make sure we pick a different one than what's already set for that 
namespace
+new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+self.cc.set_logger(worker, namespace, 'ERROR')
+request_time = int(time.time() * 1000)
+affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+assert len(affected_loggers) >= 1
+for logger in affected_loggers:
+assert logger.startswith(namespace)
+
+assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+# Force all loggers to get updated by setting the root namespace to
+# two different levels
+# This guarantees that their last-modified times will be updated
+resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+assert resp is None
+
+new_root = 'INFO'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_root, request_time),
+# This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+timeout_sec=10,
+err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+)
+
+new_level = 'DEBUG'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_level, request_time, namespace),
+timeout_sec=10,
+err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+)
+
+prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+
+prior_namespace = namespace
+new_namespace = None
+for logger, level in prior_all_loggers[0].items():
+if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   This is true for the modifications we made to `namespace`, but not the ones 
we made to the root logger (I should have clarified this in my initial 
response, sorry).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-18 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1364507968


##
tests/kafkatest/tests/connect/connect_distributed_test.py:
##
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
 wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
err_msg="Failed to see connector startup in PAUSED 
state")
 
+@cluster(num_nodes=5)
+def test_dynamic_logging(self):
+"""
+Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+"""
+
+self.setup_services(num_workers=3)
+self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+self.cc.start()
+
+worker = self.cc.nodes[0]
+prior_all_loggers = self.cc.get_all_loggers(worker)
+self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+assert prior_all_loggers is not None
+assert 'root' in prior_all_loggers
+# We need root and at least one other namespace (the other namespace 
is checked
+# later on to make sure that it hasn't changed)
+assert len(prior_all_loggers) >= 2
+for logger in prior_all_loggers.values():
+assert logger['last_modified'] is None
+
+namespace = None
+for logger in prior_all_loggers.keys():
+if logger != 'root':
+namespace = logger
+break
+assert namespace is not None
+
+initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+# Make sure we pick a different one than what's already set for that 
namespace
+new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+self.cc.set_logger(worker, namespace, 'ERROR')
+request_time = int(time.time() * 1000)
+affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+assert len(affected_loggers) >= 1
+for logger in affected_loggers:
+assert logger.startswith(namespace)
+
+assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+# Force all loggers to get updated by setting the root namespace to
+# two different levels
+# This guarantees that their last-modified times will be updated
+resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+assert resp is None
+
+new_root = 'INFO'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_root, request_time),
+# This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+timeout_sec=10,
+err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+)
+
+new_level = 'DEBUG'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_level, request_time, namespace),
+timeout_sec=10,
+err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+)
+
+prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+
+prior_namespace = namespace
+new_namespace = None
+for logger, level in prior_all_loggers[0].items():
+if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   This is true for the modifications we made to `namespace`, but not the ones 
we made to the root logger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-18 Thread via GitHub


gharris1727 commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1364474243


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying

Review Comment:
   ```suggestion
* Manages logging levels on a single worker. Supports dynamic adjustment 
and querying
   ```



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java:
##
@@ -16,603 +16,14 @@
  */
 package org.apache.kafka.connect.util.clusters;
 
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.connect.runtime.AbstractStatus;
-import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
-import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.core.Response;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-import static org.apache.kafka.test.TestUtils.waitForCondition;
-
 /**
- * A set of common assertions that can be applied to a Connect cluster during 
integration testing
+ * @deprecated Use {@link ConnectAssertions} instead.
  */
-public class EmbeddedConnectClusterAssertions {
-
-private static final Logger log = 
LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
-public static final long WORKER_SETUP_DURATION_MS = 
TimeUnit.MINUTES.toMillis(5);
-public static final long VALIDATION_DURATION_MS = 
TimeUnit.SECONDS.toMillis(30);
-public static final long CONNECTOR_SETUP_DURATION_MS = 
TimeUnit.MINUTES.toMillis(2);
-// Creating a connector requires two rounds of rebalance; destroying one 
only requires one
-// Assume it'll take ~half the time to destroy a connector as it does to 
create one
-public static final long CONNECTOR_SHUTDOWN_DURATION_MS = 
TimeUnit.MINUTES.toMillis(1);
-private static final long CONNECT_INTERNAL_TOPIC_UPDATES_DURATION_MS = 
TimeUnit.SECONDS.toMillis(60);
-
-private final EmbeddedConnectCluster connect;
-
-EmbeddedConnectClusterAssertions(EmbeddedConnectCluster connect) {
-this.connect = connect;
-}
-
-/**
- * Assert that at least the requested number of workers are up and running.
- *
- * @param numWorkers the number of online workers
- */
-public void assertAtLeastNumWorkersAreUp(int numWorkers, String 
detailMessage) throws InterruptedException {
-try {
-waitForCondition(
-() -> checkWorkersUp(numWorkers, (actual, expected) -> actual 
>= expected).orElse(false),
-WORKER_SETUP_DURATION_MS,
-"Didn't meet the minimum requested number of online workers: " 
+ numWorkers);
-} catch (AssertionError e) {
-throw new AssertionError(detailMessage, e);
-}
-}
-
-/**
- * Assert that at least the requested number of workers are up and running.
- *
- * @param numWorkers the number of online workers
- */
-public void assertExactlyNumWorkersAreUp(int numWorkers, String 
detailMessage) throws InterruptedException {
-try {
-waitForCondition(
-() -> ch

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-18 Thread via GitHub


gharris1727 commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1364457214


##
tests/kafkatest/tests/connect/connect_distributed_test.py:
##
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
 wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
err_msg="Failed to see connector startup in PAUSED 
state")
 
+@cluster(num_nodes=5)
+def test_dynamic_logging(self):
+"""
+Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+"""
+
+self.setup_services(num_workers=3)
+self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+self.cc.start()
+
+worker = self.cc.nodes[0]
+prior_all_loggers = self.cc.get_all_loggers(worker)
+self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+assert prior_all_loggers is not None
+assert 'root' in prior_all_loggers
+# We need root and at least one other namespace (the other namespace 
is checked
+# later on to make sure that it hasn't changed)
+assert len(prior_all_loggers) >= 2
+for logger in prior_all_loggers.values():
+assert logger['last_modified'] is None
+
+namespace = None
+for logger in prior_all_loggers.keys():
+if logger != 'root':
+namespace = logger
+break
+assert namespace is not None
+
+initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+# Make sure we pick a different one than what's already set for that 
namespace
+new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+self.cc.set_logger(worker, namespace, 'ERROR')
+request_time = int(time.time() * 1000)
+affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+assert len(affected_loggers) >= 1
+for logger in affected_loggers:
+assert logger.startswith(namespace)
+
+assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+# Force all loggers to get updated by setting the root namespace to
+# two different levels
+# This guarantees that their last-modified times will be updated
+resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+assert resp is None
+
+new_root = 'INFO'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_root, request_time),
+# This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+timeout_sec=10,
+err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+)
+
+new_level = 'DEBUG'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_level, request_time, namespace),
+timeout_sec=10,
+err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+)
+
+prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+
+prior_namespace = namespace
+new_namespace = None
+for logger, level in prior_all_loggers[0].items():
+if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   > altering a logging namespace will also cause child namespaces to be 
modified and begin to appear in the content of the GET /admin/loggers endpoint.
   
   Don't those get filtered out by the `startswith(namespace)` check? 
`new_namespace` can't be a child of the `namespace` which was picked initially. 
There's an assertion that a `new_namespace` is found later, so this isn't a 
problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527136


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -72,115 +41,42 @@
 import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
 
 /**
- * Start an embedded connect worker. Internally, this class will spin up a 
Kafka and Zk cluster, setup any tmp
- * directories and clean up them on them. Methods on the same {@code 
EmbeddedConnectCluster} are
+ * Start an embedded connect cluster. Internally, this class will spin up a 
Kafka and Zk cluster, set up any tmp
+ * directories, and clean them up on exit. Methods on the same {@code 
EmbeddedConnectCluster} are
  * not guaranteed to be thread-safe.
  */
-public class EmbeddedConnectCluster {
+public class EmbeddedConnectCluster extends EmbeddedConnect {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedConnectCluster.class);
 
-public static final int DEFAULT_NUM_BROKERS = 1;
 public static final int DEFAULT_NUM_WORKERS = 1;
-private static final Properties DEFAULT_BROKER_CONFIG = new Properties();
 private static final String REST_HOST_NAME = "localhost";
 
 private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-";
 
 private final Set connectCluster;
-private final EmbeddedKafkaCluster kafkaCluster;
-private final HttpClient httpClient;
 private final Map workerProps;
 private final String connectClusterName;
-private final int numBrokers;
 private final int numInitialWorkers;
-private final boolean maskExitProcedures;
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
-private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
 
-private EmbeddedConnectCluster(String name, Map 
workerProps, int numWorkers,
-   int numBrokers, Properties brokerProps,
-   boolean maskExitProcedures,
-   Map 
additionalKafkaClusterClientConfigs) {
+private EmbeddedConnectCluster(
+int numBrokers,
+Properties brokerProps,
+boolean maskExitProcedures,
+Map clientProps,
+Map workerProps,
+String name,
+int numWorkers
+) {
+super(numBrokers, brokerProps, maskExitProcedures, clientProps);
 this.workerProps = workerProps;
 this.connectClusterName = name;
-this.numBrokers = numBrokers;
-this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, 
additionalKafkaClusterClientConfigs);
 this.connectCluster = new LinkedHashSet<>();
-this.httpClient = new HttpClient();
 this.numInitialWorkers = numWorkers;
-this.maskExitProcedures = maskExitProcedures;
 // leaving non-configurable for now
 this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX;
-this.assertions = new EmbeddedConnectClusterAssertions(this);
-}
-
-/**
- * A more graceful way to handle abnormal exit of services in integration 
tests.
- */
-public Exit.Procedure exitProcedure = (code, message) -> {
-if (code != 0) {
-String exitMessage = "Abrupt service exit with code " + code + " 
and message " + message;
-log.warn(exitMessage);
-throw new UngracefulShutdownException(exitMessage);
-}
-};
-
-/**
- * A more graceful way to handle abnormal halt of services in integration 
tests.
- */
-public Exit.Procedure haltProcedure = (code, message) -> {
-if (code != 0) {
-String haltMessage = "Abrupt service halt with code " + code + " 
and message " + message;
-log.warn(haltMessage);
-throw new UngracefulShutdownException(haltMessage);
-}
-};
-
-/**
- * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
- */
-public void start() {
-if (maskExitProcedures) {
-Exit.setExitProcedure(exitProcedure);
-Exit.setHaltProcedure(haltProcedure);
-}
-kafkaCluster.start();
-startConnect();
-try {
-httpClient.start();
-} catch (Exception e) {
-throw new ConnectException("Failed to start HTTP client", e);
-}
-}
-
-/**
- * Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
- * Clean up any temp directories creat

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362530005


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -332,722 +186,46 @@ public Set workers() {
 return new LinkedHashSet<>(connectCluster);
 }
 
-/**
- * Configure a connector. If the connector does not already exist, a new 
one will be created and
- * the given configuration will be applied to it.
- *
- * @param connName   the name of the connector
- * @param connConfig the intended configuration
- * @throws ConnectRestException if the REST api returns error status
- * @throws ConnectException if the configuration fails to be serialized or 
if the request could not be sent
- */
-public String configureConnector(String connName, Map 
connConfig) {
-String url = endpointForResource(String.format("connectors/%s/config", 
connName));
-return putConnectorConfig(url, connConfig);
-}
-
-/**
- * Validate a given connector configuration. If the configuration 
validates or
- * has a configuration error, an instance of {@link ConfigInfos} is 
returned. If the validation fails
- * an exception is thrown.
- *
- * @param connClassName the name of the connector class
- * @param connConfigthe intended configuration
- * @throws ConnectRestException if the REST api returns error status
- * @throws ConnectException if the configuration fails to 
serialize/deserialize or if the request failed to send
- */
-public ConfigInfos validateConnectorConfig(String connClassName, 
Map connConfig) {
-String url = 
endpointForResource(String.format("connector-plugins/%s/config/validate", 
connClassName));
-String response = putConnectorConfig(url, connConfig);
-ConfigInfos configInfos;
-try {
-configInfos = new ObjectMapper().readValue(response, 
ConfigInfos.class);
-} catch (IOException e) {
-throw new ConnectException("Unable deserialize response into a 
ConfigInfos object");
-}
-return configInfos;
-}
-
-/**
- * Execute a PUT request with the given connector configuration on the 
given URL endpoint.
- *
- * @param urlthe full URL of the endpoint that corresponds to the 
given REST resource
- * @param connConfig the intended configuration
- * @throws ConnectRestException if the REST api returns error status
- * @throws ConnectException if the configuration fails to be serialized or 
if the request could not be sent
- */
-protected String putConnectorConfig(String url, Map 
connConfig) {
-ObjectMapper mapper = new ObjectMapper();
-String content;
-try {
-content = mapper.writeValueAsString(connConfig);
-} catch (IOException e) {
-throw new ConnectException("Could not serialize connector 
configuration and execute PUT request");
-}
-Response response = requestPut(url, content);
-if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
-return responseToString(response);
-}
-throw new ConnectRestException(response.getStatus(),
-"Could not execute PUT request. Error response: " + 
responseToString(response));
-}
-
-/**
- * Delete an existing connector.
- *
- * @param connName name of the connector to be deleted
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
-public void deleteConnector(String connName) {
-String url = endpointForResource(String.format("connectors/%s", 
connName));
-Response response = requestDelete(url);
-if (response.getStatus() >= 
Response.Status.BAD_REQUEST.getStatusCode()) {
-throw new ConnectRestException(response.getStatus(),
-"Could not execute DELETE request. Error response: " + 
responseToString(response));
-}
-}
-
-/**
- * Stop an existing connector.
- *
- * @param connName name of the connector to be paused
- * @throws ConnectRestException if the REST API returns error status
- * @throws ConnectException for any other error.
- */
-public void stopConnector(String connName) {
-String url = endpointForResource(String.format("connectors/%s/stop", 
connName));
-Response response = requestPut(url, "");
-if (response.getStatus() >= 
Response.Status.BAD_REQUEST.getStatusCode()) {
-throw new ConnectRestException(response.getStatus(),
-"Could not execute PUT request. Error response: " + 
responseToString(response));
-}
-}
-
-/**
- * Pause an existing connector.
- *
- * @param connName name of the connector to be paused
- * @throws ConnectRestException if th

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362528851


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -302,27 +177,6 @@ public String getName() {
 return connectClusterName;
 }
 
-/**
- * Get the workers that are up and running.
- *
- * @return the list of handles of the online workers
- */
-public Set activeWorkers() {
-ObjectMapper mapper = new ObjectMapper();
-return connectCluster.stream()
-.filter(w -> {
-try {
-mapper.readerFor(ServerInfo.class)
-
.readValue(responseToString(requestGet(w.url().toString(;
-return true;
-} catch (ConnectException | IOException e) {
-// Worker failed to respond. Consider it's offline
-return false;
-}
-})
-.collect(Collectors.toSet());
-}

Review Comment:
   This is also migrated directly to the `EmbeddedConnect` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527738


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -225,28 +121,6 @@ public void removeWorker(WorkerHandle worker) {
 connectCluster.remove(worker);
 }
 
-private void stopWorker(WorkerHandle worker) {
-try {
-log.info("Stopping worker {}", worker);
-worker.stop();
-} catch (UngracefulShutdownException e) {
-log.warn("Worker {} did not shutdown gracefully", worker);
-} catch (Exception e) {
-log.error("Could not stop connect", e);
-throw new RuntimeException("Could not stop worker", e);
-}
-}
-
-/**
- * Set a new timeout for REST requests to each worker in the cluster. 
Useful if a request
- * is expected to block, since the time spent awaiting that request can be 
reduced
- * and test runtime bloat can be avoided.
- * @param requestTimeoutMs the new timeout in milliseconds; must be 
positive
- */
-public void requestTimeout(long requestTimeoutMs) {
-connectCluster.forEach(worker -> 
worker.requestTimeout(requestTimeoutMs));
-}

Review Comment:
   This is also migrated directly to the `EmbeddedConnect` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527136


##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -72,115 +41,42 @@
 import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
 
 /**
- * Start an embedded connect worker. Internally, this class will spin up a 
Kafka and Zk cluster, setup any tmp
- * directories and clean up them on them. Methods on the same {@code 
EmbeddedConnectCluster} are
+ * Start an embedded connect cluster. Internally, this class will spin up a 
Kafka and Zk cluster, set up any tmp
+ * directories, and clean them up on exit. Methods on the same {@code 
EmbeddedConnectCluster} are
  * not guaranteed to be thread-safe.
  */
-public class EmbeddedConnectCluster {
+public class EmbeddedConnectCluster extends EmbeddedConnect {
 
 private static final Logger log = 
LoggerFactory.getLogger(EmbeddedConnectCluster.class);
 
-public static final int DEFAULT_NUM_BROKERS = 1;
 public static final int DEFAULT_NUM_WORKERS = 1;
-private static final Properties DEFAULT_BROKER_CONFIG = new Properties();
 private static final String REST_HOST_NAME = "localhost";
 
 private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-";
 
 private final Set connectCluster;
-private final EmbeddedKafkaCluster kafkaCluster;
-private final HttpClient httpClient;
 private final Map workerProps;
 private final String connectClusterName;
-private final int numBrokers;
 private final int numInitialWorkers;
-private final boolean maskExitProcedures;
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
-private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();
 
-private EmbeddedConnectCluster(String name, Map 
workerProps, int numWorkers,
-   int numBrokers, Properties brokerProps,
-   boolean maskExitProcedures,
-   Map 
additionalKafkaClusterClientConfigs) {
+private EmbeddedConnectCluster(
+int numBrokers,
+Properties brokerProps,
+boolean maskExitProcedures,
+Map clientProps,
+Map workerProps,
+String name,
+int numWorkers
+) {
+super(numBrokers, brokerProps, maskExitProcedures, clientProps);
 this.workerProps = workerProps;
 this.connectClusterName = name;
-this.numBrokers = numBrokers;
-this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, 
additionalKafkaClusterClientConfigs);
 this.connectCluster = new LinkedHashSet<>();
-this.httpClient = new HttpClient();
 this.numInitialWorkers = numWorkers;
-this.maskExitProcedures = maskExitProcedures;
 // leaving non-configurable for now
 this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX;
-this.assertions = new EmbeddedConnectClusterAssertions(this);
-}
-
-/**
- * A more graceful way to handle abnormal exit of services in integration 
tests.
- */
-public Exit.Procedure exitProcedure = (code, message) -> {
-if (code != 0) {
-String exitMessage = "Abrupt service exit with code " + code + " 
and message " + message;
-log.warn(exitMessage);
-throw new UngracefulShutdownException(exitMessage);
-}
-};
-
-/**
- * A more graceful way to handle abnormal halt of services in integration 
tests.
- */
-public Exit.Procedure haltProcedure = (code, message) -> {
-if (code != 0) {
-String haltMessage = "Abrupt service halt with code " + code + " 
and message " + message;
-log.warn(haltMessage);
-throw new UngracefulShutdownException(haltMessage);
-}
-};
-
-/**
- * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
- */
-public void start() {
-if (maskExitProcedures) {
-Exit.setExitProcedure(exitProcedure);
-Exit.setHaltProcedure(haltProcedure);
-}
-kafkaCluster.start();
-startConnect();
-try {
-httpClient.start();
-} catch (Exception e) {
-throw new ConnectException("Failed to start HTTP client", e);
-}
-}
-
-/**
- * Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
- * Clean up any temp directories creat

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362238029


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerTest.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(IntegrationTest.class)
+public class StandaloneWorkerTest {

Review Comment:
   Ah, good catch! Done.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.cli.ConnectStandalone;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;
+import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
+
+/**
+ * Start a standalone embedded connect worker. Internally, this class will 
spin up a Kafka and Zk cluster,
+ * setup any tmp directories and clean up them on them. Methods on the same

Review Comment:
   Shamelessly copied from the existing [EmbeddedConnectCluster 
Javadocs](https://github.com/apache/kafka/blob/e7e399b9409b42f82d7ce57b99a461c465e5849d/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java#L74-L78).
   
   I'll tweak the language to match your suggestion, which I like.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectBuilder.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * 

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


yashmayya commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362109618


##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerTest.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@Category(IntegrationTest.class)
+public class StandaloneWorkerTest {

Review Comment:
   The existing integration test classes use the suffix `IntegrationTest` in 
their class names to distinguish them more easily from unit test classes that 
only use the `Test` suffix; any particular reason for not doing so here?



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.cli.ConnectStandalone;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.WorkerConfig.PLUGIN_DISCOVERY_CONFIG;
+import static 
org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG;
+import static 
org.apache.kafka.connect.runtime.standalone.StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG;
+
+/**
+ * Start a standalone embedded connect worker. Internally, this class will 
spin up a Kafka and Zk cluster,
+ * setup any tmp directories and clean up them on them. Methods on the same

Review Comment:
   > clean up them on them
   
   Was this supposed to be something like `clean them up on exit`?



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectBuilder.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360904810


##
tests/kafkatest/tests/connect/connect_distributed_test.py:
##
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
 wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
err_msg="Failed to see connector startup in PAUSED 
state")
 
+@cluster(num_nodes=5)
+def test_dynamic_logging(self):
+"""
+Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+"""
+
+self.setup_services(num_workers=3)
+self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+self.cc.start()
+
+worker = self.cc.nodes[0]
+prior_all_loggers = self.cc.get_all_loggers(worker)
+self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+assert prior_all_loggers is not None
+assert 'root' in prior_all_loggers
+# We need root and at least one other namespace (the other namespace 
is checked
+# later on to make sure that it hasn't changed)
+assert len(prior_all_loggers) >= 2
+for logger in prior_all_loggers.values():
+assert logger['last_modified'] is None
+
+namespace = None
+for logger in prior_all_loggers.keys():
+if logger != 'root':
+namespace = logger
+break
+assert namespace is not None
+
+initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+# Make sure we pick a different one than what's already set for that 
namespace
+new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+self.cc.set_logger(worker, namespace, 'ERROR')
+request_time = int(time.time() * 1000)
+affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+assert len(affected_loggers) >= 1
+for logger in affected_loggers:
+assert logger.startswith(namespace)
+
+assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+# Force all loggers to get updated by setting the root namespace to
+# two different levels
+# This guarantees that their last-modified times will be updated
+resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+assert resp is None
+
+new_root = 'INFO'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_root, request_time),
+# This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+timeout_sec=10,
+err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+)
+
+new_level = 'DEBUG'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_level, request_time, namespace),
+timeout_sec=10,
+err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+)
+
+prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+
+prior_namespace = namespace
+new_namespace = None
+for logger, level in prior_all_loggers[0].items():
+if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   That assertion (`num_loggers >= 3`) does not necessarily have to be true for 
this condition to be met, since altering a logging namespace will also cause 
child namespaces to be modified and begin to appear in the content of the `GET 
/admin/loggers` endpoint.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-17 Thread via GitHub


yashmayya commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1362100106


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##
@@ -164,6 +171,13 @@ interface UpdateListener {
  * @param restartRequest the {@link RestartRequest restart request}
  */
 void onRestartRequest(RestartRequest restartRequest);
+
+/**
+ * Invoked when a dynamic log level adjustment has been read

Review Comment:
   Sounds good



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not 
be null
+ * @return the current level (falling back on the effective level if 
necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+public synchronized LoggerLevel level(String logger) {
+Objects.requireNonNull(logger, "Logger may not be null");
+
+org.apache.log4j.Logger foundLogger = null;
+if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+foundLogger = rootLogger();
+} else {
+Enumeration en = currentLoggers();
+// search within existing loggers for the given name.
+// using LogManger.getLogger() will create a logger if it doesn't 
exist
+// (potential leak since these don't get cleaned up).
+while (en.hasMoreElements()) {
+org.apache.log4j.Logger l = en.nextElement();
+if (logger.equals(l.getName())) {
+foundLogger = l;
+break;
+}
+}
+}
+
+if (foundLogger == null) {
+log.warn("Unable to find level for logger {}", logger);
+return null;
+}
+
+return loggerLevel(foundLogger);
+}
+
+/**
+ * Retrieve the current levels of all known loggers
+ * @return the levels of all known loggers; may be empty, but never null
+ */
+public synchronized Map allLevels() {
+Map result = new TreeMap<>();
+
+Enumeration enumeration = currentLoggers();
+Collections.list(enumeration)
+.stream()
+.filter(logger -> logger.getLevel() != null)
+.forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+org.apache.log4j.Logger root = rootLogger();
+if (root.getLevel() != null) {
+result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+}
+
+return result;
+}
+
+/**
+ * Set the level for the specified logger and all of its children
+ * @param namespace the name of the logger to adjust along with its 
children; may not be nul
+ * @param level the level to set for the logger and its

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-16 Thread via GitHub


C0urante commented on PR #14538:
URL: https://github.com/apache/kafka/pull/14538#issuecomment-1764882825

   Ah, never mind--I realize that some static fields for timeouts from the 
`EmbeddedConnectClusterAssertions` class may be used in downstream projects. 
I've pushed a new commit with the suggested deprecated shim. Thanks again for 
catching this, Greg!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-16 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360788058


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not 
be null
+ * @return the current level (falling back on the effective level if 
necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+public synchronized LoggerLevel level(String logger) {

Review Comment:
   Yeah, the synchronization here is fairly coarse-grained but I don't think 
it's likely to be a bottleneck (who's going to be issuing hundreds of dynamic 
log requests a second?).
   
   It's also necessary for this read-only operation to have some 
synchronization in order to ensure that log levels and their last-modified 
times are kept in-sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-16 Thread via GitHub


C0urante commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360768631


##
checkstyle/suppressions.xml:
##
@@ -139,6 +139,8 @@
   files="Worker(SinkTask|SourceTask|Coordinator).java"/>
 
+

Review Comment:
   Switching on enums still requires a default case; there's a good explanation 
on 
[StackOverflow](https://stackoverflow.com/questions/5013194/why-is-default-required-for-a-switch-on-an-enum)
 for why.



##
tests/kafkatest/tests/connect/connect_distributed_test.py:
##
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
 wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
err_msg="Failed to see connector startup in PAUSED 
state")
 
+@cluster(num_nodes=5)
+def test_dynamic_logging(self):
+"""
+Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+"""
+
+self.setup_services(num_workers=3)
+self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+self.cc.start()
+
+worker = self.cc.nodes[0]
+prior_all_loggers = self.cc.get_all_loggers(worker)
+self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+assert prior_all_loggers is not None
+assert 'root' in prior_all_loggers
+# We need root and at least one other namespace (the other namespace 
is checked
+# later on to make sure that it hasn't changed)
+assert len(prior_all_loggers) >= 2
+for logger in prior_all_loggers.values():
+assert logger['last_modified'] is None
+
+namespace = None
+for logger in prior_all_loggers.keys():
+if logger != 'root':
+namespace = logger
+break
+assert namespace is not None
+
+initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+# Make sure we pick a different one than what's already set for that 
namespace
+new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+self.cc.set_logger(worker, namespace, 'ERROR')
+request_time = int(time.time() * 1000)
+affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+assert len(affected_loggers) >= 1
+for logger in affected_loggers:
+assert logger.startswith(namespace)
+
+assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+# Force all loggers to get updated by setting the root namespace to
+# two different levels
+# This guarantees that their last-modified times will be updated
+resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+assert resp is None
+
+new_root = 'INFO'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_root, request_time),
+# This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+timeout_sec=10,
+err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+)
+
+new_level = 'DEBUG'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_level, request_time, namespace),
+timeout_sec=10,
+err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+)
+
+prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+
+prior_namespace = namespace
+new_namespace = None
+for logger, level in prior_all_loggers[0].items():
+if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   That assertion (`num_loggers = 3`) does not necessarily have to be true for 
this condition to be met, since altering a logging namespace will also cause 
child namespaces to be modified and begin to appear in the content of the `GET 
/admin/loggers` endpoint.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -297,8 +297,8 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
 // When automatic topic creation is disabled on the broker
 brokerProps.put("auto.cr

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-16 Thread via GitHub


yashmayya commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1360494846


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not 
be null
+ * @return the current level (falling back on the effective level if 
necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+public synchronized LoggerLevel level(String logger) {
+Objects.requireNonNull(logger, "Logger may not be null");
+
+org.apache.log4j.Logger foundLogger = null;
+if (ROOT_LOGGER_NAME.equalsIgnoreCase(logger)) {
+foundLogger = rootLogger();
+} else {
+Enumeration en = currentLoggers();
+// search within existing loggers for the given name.
+// using LogManger.getLogger() will create a logger if it doesn't 
exist
+// (potential leak since these don't get cleaned up).
+while (en.hasMoreElements()) {
+org.apache.log4j.Logger l = en.nextElement();
+if (logger.equals(l.getName())) {
+foundLogger = l;
+break;
+}
+}
+}
+
+if (foundLogger == null) {
+log.warn("Unable to find level for logger {}", logger);
+return null;
+}
+
+return loggerLevel(foundLogger);
+}
+
+/**
+ * Retrieve the current levels of all known loggers
+ * @return the levels of all known loggers; may be empty, but never null
+ */
+public synchronized Map allLevels() {
+Map result = new TreeMap<>();
+
+Enumeration enumeration = currentLoggers();
+Collections.list(enumeration)
+.stream()
+.filter(logger -> logger.getLevel() != null)
+.forEach(logger -> result.put(logger.getName(), 
loggerLevel(logger)));
+
+org.apache.log4j.Logger root = rootLogger();
+if (root.getLevel() != null) {
+result.put(ROOT_LOGGER_NAME, loggerLevel(root));
+}
+
+return result;
+}
+
+/**
+ * Set the level for the specified logger and all of its children
+ * @param namespace the name of the logger to adjust along with its 
children; may not be nul

Review Comment:
   ```suggestion
* @param namespace the name of the logger to adjust along with its 
children; may not be null
   ```
   nit



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -917,4 +924,27 @@ public void resetConnectorOffsets(String connName, 
Callback callback) {
  * @param cb callback to invoke upon completion
  */
 protected abstract void modifyConnectorOffsets(String connName, 
Map, M

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-13 Thread via GitHub


gharris1727 commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1358597074


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not 
be null
+ * @return the current level (falling back on the effective level if 
necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+public synchronized LoggerLevel level(String logger) {

Review Comment:
   @yangy Writes must have an exclusive lock for thread safety, meaning 
that no concurrent reads are allowed while a write is happening. The 
synchronized lock here makes sure that no writes are ongoing. It's a bit 
stronger than necessary: multiple concurrent readers could be allowed, but 
multiple readers sharing a lock is a little bit more complex, and probably not 
worth the performance difference. These methods are not called in any hotpath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-12 Thread via GitHub


yangy commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1357785014


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.
+ * @param logger the name of the logger to retrieve the level for; may not 
be null
+ * @return the current level (falling back on the effective level if 
necessary) of the logger,
+ * or null if no logger with the specified name exists
+ */
+public synchronized LoggerLevel level(String logger) {

Review Comment:
   looks like this method is read only method,  what's the reason for putting 
synchronized lock in here? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-12 Thread via GitHub


yangy commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1357779469


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Loggers.java:
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * Manges logging levels on a single worker. Supports dynamic adjustment and 
querying
+ * of logging levels.
+ * 
+ * This class is thread-safe; concurrent calls to all of its public methods 
from any number
+ * of threads are permitted.
+ */
+public class Loggers {
+
+private static final Logger log = LoggerFactory.getLogger(Loggers.class);
+
+/**
+ * Log4j uses "root" (case-insensitive) as name of the root logger.
+ */
+private static final String ROOT_LOGGER_NAME = "root";
+
+private final Time time;
+private final Map lastModifiedTimes;
+
+public Loggers(Time time) {
+this.time = time;
+this.lastModifiedTimes = new HashMap<>();
+}
+
+/**
+ * Retrieve the current level for a single logger.

Review Comment:
   nit: how about use " 3rd person declarative" for java doc?  ref: 
https://www.oracle.com/technical-resources/articles/java/javadoc-tool.html
   
   "The description is in 3rd person declarative rather than 2nd person 
imperative."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-12 Thread via GitHub


gharris1727 commented on code in PR #14538:
URL: https://github.com/apache/kafka/pull/14538#discussion_r1357258143


##
checkstyle/suppressions.xml:
##
@@ -139,6 +139,8 @@
   files="Worker(SinkTask|SourceTask|Coordinator).java"/>
 
+

Review Comment:
   nit: we could probably avoid this warning with enum parsing and a fallback 
enum.



##
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##
@@ -297,8 +297,8 @@ public void 
testSourceTaskNotBlockedOnShutdownWithNonExistentTopic() throws Exce
 // When automatic topic creation is disabled on the broker
 brokerProps.put("auto.create.topics.enable", "false");
 connect = connectBuilder
-.brokerProps(brokerProps)
 .numWorkers(1)
+.brokerProps(brokerProps)

Review Comment:
   nit: is this necessary?



##
tests/kafkatest/tests/connect/connect_distributed_test.py:
##
@@ -375,6 +381,159 @@ def test_pause_state_persistent(self, 
exactly_once_source, connect_protocol, met
 wait_until(lambda: self.is_paused(self.source, node), 
timeout_sec=120,
err_msg="Failed to see connector startup in PAUSED 
state")
 
+@cluster(num_nodes=5)
+def test_dynamic_logging(self):
+"""
+Test out the REST API for dynamically adjusting logging levels, on 
both a single-worker and cluster-wide basis.
+"""
+
+self.setup_services(num_workers=3)
+self.cc.set_configs(lambda node: 
self.render("connect-distributed.properties", node=node))
+self.cc.start()
+
+worker = self.cc.nodes[0]
+prior_all_loggers = self.cc.get_all_loggers(worker)
+self.logger.debug("Listed all loggers via REST API: %s", 
str(prior_all_loggers))
+assert prior_all_loggers is not None
+assert 'root' in prior_all_loggers
+# We need root and at least one other namespace (the other namespace 
is checked
+# later on to make sure that it hasn't changed)
+assert len(prior_all_loggers) >= 2
+for logger in prior_all_loggers.values():
+assert logger['last_modified'] is None
+
+namespace = None
+for logger in prior_all_loggers.keys():
+if logger != 'root':
+namespace = logger
+break
+assert namespace is not None
+
+initial_level = self.cc.get_logger(worker, namespace)['level'].upper()
+# Make sure we pick a different one than what's already set for that 
namespace
+new_level = 'INFO' if initial_level != 'INFO' else 'WARN'
+self.cc.set_logger(worker, namespace, 'ERROR')
+request_time = int(time.time() * 1000)
+affected_loggers = self.cc.set_logger(worker, namespace, new_level)
+assert len(affected_loggers) >= 1
+for logger in affected_loggers:
+assert logger.startswith(namespace)
+
+assert self.loggers_set(new_level, request_time, namespace, 
workers=[worker])
+assert self.loggers_set(initial_level, None, namespace, 
workers=self.cc.nodes[1:])
+
+# Force all loggers to get updated by setting the root namespace to
+# two different levels
+# This guarantees that their last-modified times will be updated
+resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster')
+assert resp is None
+
+new_root = 'INFO'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, 'root', new_root, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_root, request_time),
+# This should be super quick--just a write+read of the config 
topic, which workers are constantly polling
+timeout_sec=10,
+err_msg="Log level for root namespace was not adjusted in a 
reasonable amount of time."
+)
+
+new_level = 'DEBUG'
+request_time = int(time.time() * 1000)
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+wait_until(
+lambda: self.loggers_set(new_level, request_time, namespace),
+timeout_sec=10,
+err_msg='Log level for namespace ' + namespace + ' was not 
adjusted in a reasonable amount of time.'
+)
+
+prior_all_loggers = [self.cc.get_all_loggers(node) for node in 
self.cc.nodes]
+resp = self.cc.set_logger(worker, namespace, new_level, 'cluster')
+assert resp is None
+
+prior_namespace = namespace
+new_namespace = None
+for logger, level in prior_all_loggers[0].items():
+if logger != 'root' and not logger.startswith(namespace):

Review Comment:
   In order for this to be true, there must be at least two non-overlapping 
namespaces not including root. With root included, does that mean that the 
earlier `ass

Re: [PR] KAFKA-15428: Cluster-wide dynamic log adjustments for Connect [kafka]

2023-10-12 Thread via GitHub


C0urante commented on PR #14538:
URL: https://github.com/apache/kafka/pull/14538#issuecomment-1760004960

   @gharris1727, @yashmayya, @mimaison would any of you be able to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org