mimaison commented on code in PR #21289: URL: https://github.com/apache/kafka/pull/21289#discussion_r2681825982
########## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.config.SocketServerConfigs; + +import org.apache.commons.validator.routines.InetAddressValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +/** + * General helper functions! + * <p> + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * <p> + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CoreUtils.class); + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + * @param logLevel The log level to use for logging. + */ + public static void swallow(Runnable action, Logger logging, Level logLevel) { + try { + action.run(); + } catch (Throwable e) { + switch (logLevel) { + case ERROR -> logging.error(e.getMessage(), e); + case INFO -> logging.info(e.getMessage(), e); + case DEBUG -> logging.debug(e.getMessage(), e); + case TRACE -> logging.trace(e.getMessage(), e); + case WARN -> logging.warn(e.getMessage(), e); + } + } + } + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * Uses {@link Level#WARN} as the default logging level. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + */ + public static void swallow(Runnable action, Logger logging) { + swallow(action, logging, Level.WARN); + } + + /** + * Recursively delete the list of files/directories and any subfiles (if any exist) + * + * @param files list of files to be deleted + */ + public static void delete(List<String> files) throws IOException { Review Comment: This is only used by tests so we shouldn't have this in `src/main`. We could move it to `TestUtils` so even inline it as it's really short, it could even be a 1 liner if using the functional syntax `forEach()`. ########## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.config.SocketServerConfigs; + +import org.apache.commons.validator.routines.InetAddressValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +/** + * General helper functions! + * <p> + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * <p> + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CoreUtils.class); + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + * @param logLevel The log level to use for logging. + */ + public static void swallow(Runnable action, Logger logging, Level logLevel) { + try { + action.run(); + } catch (Throwable e) { + switch (logLevel) { + case ERROR -> logging.error(e.getMessage(), e); + case INFO -> logging.info(e.getMessage(), e); + case DEBUG -> logging.debug(e.getMessage(), e); + case TRACE -> logging.trace(e.getMessage(), e); + case WARN -> logging.warn(e.getMessage(), e); + } + } + } + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * Uses {@link Level#WARN} as the default logging level. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + */ + public static void swallow(Runnable action, Logger logging) { + swallow(action, logging, Level.WARN); + } + + /** + * Recursively delete the list of files/directories and any subfiles (if any exist) + * + * @param files list of files to be deleted + */ + public static void delete(List<String> files) throws IOException { + for (String file : files) { + Utils.delete(new File(file)); + } + } + + /** + * Register the given mbean with the platform mbean server, + * unregistering any mbean that was there before. Note, + * this method will not throw an exception if the registration + * fails (since there is nothing you can do, and it isn't fatal), + * instead it just returns false indicating the registration failed. + * + * @param mbean The object to register as a mbean + * @param name The name to register this mbean with + * @return true if the registration succeeded + */ + public static boolean registerMBean(Object mbean, String name) { Review Comment: See my other comments, but it seems all other methods could be deleted or moved elsewhere. That leaves us with only this method. It does not make sense to have a utility class for a single method. ########## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.config.SocketServerConfigs; + +import org.apache.commons.validator.routines.InetAddressValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +/** + * General helper functions! + * <p> + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * <p> + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { Review Comment: The class name does not make sense anymore, as it's not in the core module anymore. ########## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.config.SocketServerConfigs; + +import org.apache.commons.validator.routines.InetAddressValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +/** + * General helper functions! + * <p> + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * <p> + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CoreUtils.class); + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + * @param logLevel The log level to use for logging. + */ + public static void swallow(Runnable action, Logger logging, Level logLevel) { Review Comment: Do we really need this method? Could we use `Utils.closeQuietly()` instead? ########## server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import org.apache.kafka.common.Endpoint; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.config.SocketServerConfigs; + +import org.apache.commons.validator.routines.InetAddressValidator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +/** + * General helper functions! + * <p> + * This is for general helper functions that aren't specific to Kafka logic. Things that should have been included in + * the standard library etc. + * <p> + * If you are making a new helper function and want to add it to this class please ensure the following: + * 1. It has documentation + * 2. It is the most general possible utility, not just the thing you needed in one particular place + * 3. You have tests for it if it is nontrivial in any way + */ +public class CoreUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(CoreUtils.class); + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + * @param logLevel The log level to use for logging. + */ + public static void swallow(Runnable action, Logger logging, Level logLevel) { + try { + action.run(); + } catch (Throwable e) { + switch (logLevel) { + case ERROR -> logging.error(e.getMessage(), e); + case INFO -> logging.info(e.getMessage(), e); + case DEBUG -> logging.debug(e.getMessage(), e); + case TRACE -> logging.trace(e.getMessage(), e); + case WARN -> logging.warn(e.getMessage(), e); + } + } + } + + /** + * Do the given action and log any exceptions thrown without rethrowing them. + * Uses {@link Level#WARN} as the default logging level. + * + * @param action The action to execute. + * @param logging The logging instance to use for logging the thrown exception. + */ + public static void swallow(Runnable action, Logger logging) { + swallow(action, logging, Level.WARN); + } + + /** + * Recursively delete the list of files/directories and any subfiles (if any exist) + * + * @param files list of files to be deleted + */ + public static void delete(List<String> files) throws IOException { + for (String file : files) { + Utils.delete(new File(file)); + } + } + + /** + * Register the given mbean with the platform mbean server, + * unregistering any mbean that was there before. Note, + * this method will not throw an exception if the registration + * fails (since there is nothing you can do, and it isn't fatal), + * instead it just returns false indicating the registration failed. + * + * @param mbean The object to register as a mbean + * @param name The name to register this mbean with + * @return true if the registration succeeded + */ + public static boolean registerMBean(Object mbean, String name) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + synchronized (mbs) { + ObjectName objName = new ObjectName(name); + if (mbs.isRegistered(objName)) { + mbs.unregisterMBean(objName); + } + mbs.registerMBean(mbean, objName); + return true; + } + } catch (Exception e) { + LOGGER.error("Failed to register Mbean with name {}", name, e); + return false; + } + } + + public static List<Endpoint> listenerListToEndPoints(List<String> listeners, Map<ListenerName, SecurityProtocol> securityProtocolMap) { Review Comment: All the methods below are only used by `KafkaConfig` so they should probably be moved there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
