gharris1727 commented on code in PR #14064: URL: https://github.com/apache/kafka/pull/14064#discussion_r1286420767
########## tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java: ########## @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.impl.Arguments; +import net.sourceforge.argparse4j.inf.ArgumentGroup; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory; +import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader; +import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginScanResult; +import org.apache.kafka.connect.runtime.isolation.PluginSource; +import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.isolation.PluginUtils; +import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; +import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PrintStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ConnectPluginPath { + + private static final String MANIFEST_PREFIX = "META-INF/services/"; + private static final Object[] LIST_TABLE_COLUMNS = { + "pluginName", + "firstAlias", + "secondAlias", + "pluginVersion", + "pluginType", + "isLoadable", + "hasManifest", + "pluginLocation" // last because it is least important and most repetitive + }; + + public static void main(String[] args) { + Exit.exit(mainNoExit(args, System.out, System.err)); + } + + public static int mainNoExit(String[] args, PrintStream out, PrintStream err) { + ArgumentParser parser = parser(); + try { + Namespace namespace = parser.parseArgs(args); + Config config = parseConfig(parser, namespace, out); + runCommand(config); + return 0; + } catch (ArgumentParserException e) { + parser.handleError(e); + return 1; + } catch (TerseException e) { + err.println(e.getMessage()); + return 2; + } catch (Throwable e) { + err.println(e.getMessage()); + err.println(Utils.stackTrace(e)); + return 3; + } + } + + private static ArgumentParser parser() { + ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-plugin-path") + .defaultHelp(true) + .description("Manage plugins on the Connect plugin.path"); + + ArgumentParser listCommand = parser.addSubparsers() + .description("List information about plugins contained within the specified plugin locations") + .dest("subcommand") + .addParser("list"); + + ArgumentParser[] subparsers = new ArgumentParser[] { + listCommand, + }; + + for (ArgumentParser subparser : subparsers) { + ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin providers"); + pluginProviders.addArgument("--plugin-location") + .setDefault(new ArrayList<>()) + .action(Arguments.append()) + .help("A single plugin location (jar file or directory)"); + + pluginProviders.addArgument("--plugin-path") + .setDefault(new ArrayList<>()) + .action(Arguments.append()) + .help("A comma-delimited list of locations containing plugins"); + + pluginProviders.addArgument("--worker-config") + .setDefault(new ArrayList<>()) + .action(Arguments.append()) + .help("A Connect worker configuration file"); + } + + return parser; + } + + private static Config parseConfig(ArgumentParser parser, Namespace namespace, PrintStream out) throws ArgumentParserException, TerseException { + Set<Path> locations = parseLocations(parser, namespace); + String subcommand = namespace.getString("subcommand"); + if (subcommand == null) { + throw new ArgumentParserException("No subcommand specified", parser); + } + switch (subcommand) { + case "list": + return new Config(Command.LIST, locations, out); + default: + throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser); + } + } + + private static Set<Path> parseLocations(ArgumentParser parser, Namespace namespace) throws ArgumentParserException, TerseException { + List<String> rawLocations = new ArrayList<>(namespace.getList("plugin_location")); + List<String> rawPluginPaths = new ArrayList<>(namespace.getList("plugin_path")); + List<String> rawWorkerConfigs = new ArrayList<>(namespace.getList("worker_config")); + if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && rawWorkerConfigs.isEmpty()) { + throw new ArgumentParserException("Must specify at least one --plugin-location, --plugin-path, or --worker-config", parser); + } + Set<Path> pluginLocations = new LinkedHashSet<>(); + for (String rawWorkerConfig : rawWorkerConfigs) { + Properties properties; + try { + properties = Utils.loadProps(rawWorkerConfig); + } catch (IOException e) { + throw new TerseException("Unable to read worker config at " + rawWorkerConfig); + } + String pluginPath = properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG); + if (pluginPath != null) { + rawPluginPaths.add(pluginPath); + } + } + for (String rawPluginPath : rawPluginPaths) { + try { + pluginLocations.addAll(PluginUtils.pluginLocations(rawPluginPath, true)); + } catch (UncheckedIOException e) { + throw new TerseException("Unable to parse plugin path " + rawPluginPath + ": " + e.getMessage()); + } + } + for (String rawLocation : rawLocations) { + Path pluginLocation = Paths.get(rawLocation); + if (!pluginLocation.toFile().exists()) { + throw new TerseException("Specified location " + pluginLocation + " does not exist"); + } + pluginLocations.add(pluginLocation); + } + return pluginLocations; + } + + private static Path makeDistinct(Path path) { + // When the same jar is present on the classpath and inside a pluginLocation, we still need to be able + // to distinguish between them. In order to make them more likely to be distinct, change the path to + // include an arbitrary change that makes them still resolve the same file, but fail direct equality checks. + Path parent = path.getParent(); + if (parent == null) { + // At the root of the filesystem, add the current directory marker to the end. + return path.resolve("."); + } else { + // If we have a parent directory, add the current directory marker there. + return parent.resolve(".").resolve(path.getFileName()); + } + } Review Comment: > What do you think about adding a boolean isolated field to ManifestEntry that can be used to distinguish between classpath and non-classpath plugins? Unfortunately this doesn't help because the ambiguity happens before we construct the ManifestEntry. Here's the situation: * `/path/to/X.jar` contains manifest file named Y. * The classpath and at least one pluginLocation lists Jar X. * `AppClassLoader#getResources(Y)` returns `[jar:file:/path/to/X.jar!/Y]` which then becomes one ManifestEntry. * `PluginClassLoader#getResources(Y)` returns `[jar:file:/path/to/X.jar!/Y, jar:file:/path/to/X.jar!/Y]` which then becomes one ManifestEntry due to Set equality. My previous solution was to change `PluginClassLoader#getResources(Y)` to return `[jar:file:/path/to/X.jar!/Y, jar:file:/path/to/./X.jar!/Y]` so that it would become two different ManifestEntry objects. The new solution keeps the original `PluginClassLoader#getResources(Y)` which returns `[jar:file:/path/to/X.jar!/Y, jar:file:/path/to/X.jar!/Y]`, but creates two identical ManifestEntry objects in a List. Then when processing the excludes, one copy is removed and the other stays behind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org