gharris1727 commented on code in PR #14064:
URL: https://github.com/apache/kafka/pull/14064#discussion_r1286420767


##########
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.impl.Arguments;
+import net.sourceforge.argparse4j.inf.ArgumentGroup;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.ClassLoaderFactory;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginScanResult;
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+import org.apache.kafka.connect.runtime.isolation.PluginUtils;
+import org.apache.kafka.connect.runtime.isolation.ReflectionScanner;
+import org.apache.kafka.connect.runtime.isolation.ServiceLoaderScanner;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ConnectPluginPath {
+
+    private static final String MANIFEST_PREFIX = "META-INF/services/";
+    private static final Object[] LIST_TABLE_COLUMNS = {
+        "pluginName",
+        "firstAlias",
+        "secondAlias",
+        "pluginVersion",
+        "pluginType",
+        "isLoadable",
+        "hasManifest",
+        "pluginLocation" // last because it is least important and most 
repetitive
+    };
+
+    public static void main(String[] args) {
+        Exit.exit(mainNoExit(args, System.out, System.err));
+    }
+
+    public static int mainNoExit(String[] args, PrintStream out, PrintStream 
err) {
+        ArgumentParser parser = parser();
+        try {
+            Namespace namespace = parser.parseArgs(args);
+            Config config = parseConfig(parser, namespace, out);
+            runCommand(config);
+            return 0;
+        } catch (ArgumentParserException e) {
+            parser.handleError(e);
+            return 1;
+        } catch (TerseException e) {
+            err.println(e.getMessage());
+            return 2;
+        } catch (Throwable e) {
+            err.println(e.getMessage());
+            err.println(Utils.stackTrace(e));
+            return 3;
+        }
+    }
+
+    private static ArgumentParser parser() {
+        ArgumentParser parser = 
ArgumentParsers.newArgumentParser("connect-plugin-path")
+            .defaultHelp(true)
+            .description("Manage plugins on the Connect plugin.path");
+
+        ArgumentParser listCommand = parser.addSubparsers()
+            .description("List information about plugins contained within the 
specified plugin locations")
+            .dest("subcommand")
+            .addParser("list");
+
+        ArgumentParser[] subparsers = new ArgumentParser[] {
+            listCommand,
+        };
+
+        for (ArgumentParser subparser : subparsers) {
+            ArgumentGroup pluginProviders = subparser.addArgumentGroup("plugin 
providers");
+            pluginProviders.addArgument("--plugin-location")
+                .setDefault(new ArrayList<>())
+                .action(Arguments.append())
+                .help("A single plugin location (jar file or directory)");
+
+            pluginProviders.addArgument("--plugin-path")
+                .setDefault(new ArrayList<>())
+                .action(Arguments.append())
+                .help("A comma-delimited list of locations containing 
plugins");
+
+            pluginProviders.addArgument("--worker-config")
+                .setDefault(new ArrayList<>())
+                .action(Arguments.append())
+                .help("A Connect worker configuration file");
+        }
+
+        return parser;
+    }
+
+    private static Config parseConfig(ArgumentParser parser, Namespace 
namespace, PrintStream out) throws ArgumentParserException, TerseException {
+        Set<Path> locations = parseLocations(parser, namespace);
+        String subcommand = namespace.getString("subcommand");
+        if (subcommand == null) {
+            throw new ArgumentParserException("No subcommand specified", 
parser);
+        }
+        switch (subcommand) {
+            case "list":
+                return new Config(Command.LIST, locations, out);
+            default:
+                throw new ArgumentParserException("Unrecognized subcommand: '" 
+ subcommand + "'", parser);
+        }
+    }
+
+    private static Set<Path> parseLocations(ArgumentParser parser, Namespace 
namespace) throws ArgumentParserException, TerseException {
+        List<String> rawLocations = new 
ArrayList<>(namespace.getList("plugin_location"));
+        List<String> rawPluginPaths = new 
ArrayList<>(namespace.getList("plugin_path"));
+        List<String> rawWorkerConfigs = new 
ArrayList<>(namespace.getList("worker_config"));
+        if (rawLocations.isEmpty() && rawPluginPaths.isEmpty() && 
rawWorkerConfigs.isEmpty()) {
+            throw new ArgumentParserException("Must specify at least one 
--plugin-location, --plugin-path, or --worker-config", parser);
+        }
+        Set<Path> pluginLocations = new LinkedHashSet<>();
+        for (String rawWorkerConfig : rawWorkerConfigs) {
+            Properties properties;
+            try {
+                properties = Utils.loadProps(rawWorkerConfig);
+            } catch (IOException e) {
+                throw new TerseException("Unable to read worker config at " + 
rawWorkerConfig);
+            }
+            String pluginPath = 
properties.getProperty(WorkerConfig.PLUGIN_PATH_CONFIG);
+            if (pluginPath != null) {
+                rawPluginPaths.add(pluginPath);
+            }
+        }
+        for (String rawPluginPath : rawPluginPaths) {
+            try {
+                
pluginLocations.addAll(PluginUtils.pluginLocations(rawPluginPath, true));
+            } catch (UncheckedIOException e) {
+                throw new TerseException("Unable to parse plugin path " + 
rawPluginPath + ": " + e.getMessage());
+            }
+        }
+        for (String rawLocation : rawLocations) {
+            Path pluginLocation = Paths.get(rawLocation);
+            if (!pluginLocation.toFile().exists()) {
+                throw new TerseException("Specified location " + 
pluginLocation + " does not exist");
+            }
+            pluginLocations.add(pluginLocation);
+        }
+        return pluginLocations;
+    }
+
+    private static Path makeDistinct(Path path) {
+        // When the same jar is present on the classpath and inside a 
pluginLocation, we still need to be able
+        // to distinguish between them. In order to make them more likely to 
be distinct, change the path to
+        // include an arbitrary change that makes them still resolve the same 
file, but fail direct equality checks.
+        Path parent = path.getParent();
+        if (parent == null) {
+            // At the root of the filesystem, add the current directory marker 
to the end.
+            return path.resolve(".");
+        } else {
+            // If we have a parent directory, add the current directory marker 
there.
+            return parent.resolve(".").resolve(path.getFileName());
+        }
+    }

Review Comment:
   > What do you think about adding a boolean isolated field to ManifestEntry 
that can be used to distinguish between classpath and non-classpath plugins?
   
   Unfortunately this doesn't help because the ambiguity happens before we 
construct the ManifestEntry. Here's the situation:
   
   * `/path/to/X.jar` contains manifest file named Y.
   * The classpath and at least one pluginLocation lists Jar X.
   * `AppClassLoader#getResources(Y)` returns `[jar:file:/path/to/X.jar!/Y]` 
which then becomes one ManifestEntry.
   * `PluginClassLoader#getResources(Y)` returns `[jar:file:/path/to/X.jar!/Y, 
jar:file:/path/to/X.jar!/Y]` which then becomes one ManifestEntry due to Set 
equality.
   
   My previous solution was to change `PluginClassLoader#getResources(Y)` to 
return `[jar:file:/path/to/X.jar!/Y, jar:file:/path/to/./X.jar!/Y]` so that it 
would become two different ManifestEntry objects.
   
   The new solution keeps the original `PluginClassLoader#getResources(Y)` 
which returns `[jar:file:/path/to/X.jar!/Y, jar:file:/path/to/X.jar!/Y]`, but 
creates two identical ManifestEntry objects in a List. Then when processing the 
excludes, one copy is removed and the other stays behind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to