Repository: kafka
Updated Branches:
  refs/heads/trunk 561dd3864 -> 5ec6765bd


KAFKA-6087: Scanning plugin.path needs to support relative symlinks.

Author: Konstantine Karantasis <konstant...@confluent.io>

Reviewers: Ewen Cheslack-Postava <e...@confluent.io>

Closes #4092 from 
kkonstantine/KAFKA-6087-Scanning-plugin.path-needs-to-support-relative-symlinks


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ec6765b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ec6765b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ec6765b

Branch: refs/heads/trunk
Commit: 5ec6765bdbe136951d737f30b5da62b07cc5cc68
Parents: 561dd38
Author: Konstantine Karantasis <konstant...@confluent.io>
Authored: Thu Oct 19 14:24:57 2017 -0700
Committer: Ewen Cheslack-Postava <m...@ewencp.org>
Committed: Thu Oct 19 14:24:57 2017 -0700

----------------------------------------------------------------------
 .../connect/runtime/isolation/PluginUtils.java  |  70 ++++++++-
 .../runtime/isolation/PluginUtilsTest.java      | 141 +++++++++++++++++++
 2 files changed, 209 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5ec6765b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index edc1636..9ef7c3a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -35,6 +35,9 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 
+/**
+ * Connect plugin utility methods.
+ */
 public class PluginUtils {
     private static final Logger log = 
LoggerFactory.getLogger(PluginUtils.class);
 
@@ -135,19 +138,45 @@ public class PluginUtils {
         }
     };
 
+    /**
+     * Return whether the class with the given name should be loaded in 
isolation using a plugin
+     * classloader.
+     *
+     * @param name the fully qualified name of the class.
+     * @return true if this class should be loaded in isolation, false 
otherwise.
+     */
     public static boolean shouldLoadInIsolation(String name) {
         return !(name.matches(BLACKLIST) && !name.matches(WHITELIST));
     }
 
+    /**
+     * Verify the given class corresponds to a concrete class and not to an 
abstract class or
+     * interface.
+     * @param klass the class object.
+     * @return true if the argument is a concrete class, false if it's 
abstract or interface.
+     */
     public static boolean isConcrete(Class<?> klass) {
         int mod = klass.getModifiers();
         return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
     }
 
+    /**
+     * Return whether a path corresponds to a JAR or ZIP archive.
+     *
+     * @param path the path to validate.
+     * @return true if the path is a JAR or ZIP archive file, otherwise false.
+     */
     public static boolean isArchive(Path path) {
-        return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar");
+        String archivePath = path.toString().toLowerCase(Locale.ROOT);
+        return archivePath.endsWith(".jar") || archivePath.endsWith(".zip");
     }
 
+    /**
+     * Return whether a path corresponds java class file.
+     *
+     * @param path the path to validate.
+     * @return true if the path is a java class file, otherwise false.
+     */
     public static boolean isClassFile(Path path) {
         return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
     }
@@ -167,6 +196,16 @@ public class PluginUtils {
         return locations;
     }
 
+    /**
+     * Given a top path in the filesystem, return a list of paths to archives 
(JAR or ZIP
+     * files) contained under this top path. If the top path contains only 
java class files,
+     * return the top path itself. This method follows symbolic links to 
discover archives and
+     * returns the such archives as absolute paths.
+     *
+     * @param topPath the path to use as root of plugin search.
+     * @return a list of potential plugin paths, or empty list if no such 
paths exist.
+     * @throws IOException
+     */
     public static List<Path> pluginUrls(Path topPath) throws IOException {
         boolean containsClassFiles = false;
         Set<Path> archives = new HashSet<>();
@@ -193,7 +232,13 @@ public class PluginUtils {
 
                 Path adjacent = neighbors.next();
                 if (Files.isSymbolicLink(adjacent)) {
-                    Path absolute = 
Files.readSymbolicLink(adjacent).toRealPath();
+                    Path symlink = Files.readSymbolicLink(adjacent);
+                    // if symlink is absolute resolve() returns the absolute 
symlink itself
+                    Path parent = adjacent.getParent();
+                    if (parent == null) {
+                        continue;
+                    }
+                    Path absolute = parent.resolve(symlink).toRealPath();
                     if (Files.exists(absolute)) {
                         adjacent = absolute;
                     } else {
@@ -232,10 +277,23 @@ public class PluginUtils {
         return Arrays.asList(archives.toArray(new Path[0]));
     }
 
+    /**
+     * Return the simple class name of a plugin as {@code String}.
+     *
+     * @param plugin the plugin descriptor.
+     * @return the plugin's simple class name.
+     */
     public static String simpleName(PluginDesc<?> plugin) {
         return plugin.pluginClass().getSimpleName();
     }
 
+    /**
+     * Remove the plugin type name at the end of a plugin class name, if such 
suffix is present.
+     * This method is meant to be used to extract plugin aliases.
+     *
+     * @param plugin the plugin descriptor.
+     * @return the pruned simple class name of the plugin.
+     */
     public static String prunedName(PluginDesc<?> plugin) {
         // It's currently simpler to switch on type than do pattern matching.
         switch (plugin.type()) {
@@ -248,6 +306,14 @@ public class PluginUtils {
         }
     }
 
+    /**
+     * Verify whether a given plugin's alias matches another alias in a 
collection of plugins.
+     *
+     * @param alias the plugin descriptor to test for alias matching.
+     * @param plugins the collection of plugins to test against.
+     * @param <U> the plugin type.
+     * @return false if a match was found in the collection, otherwise true.
+     */
     public static <U> boolean isAliasUnique(
             PluginDesc<U> alias,
             Collection<PluginDesc<U>> plugins

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ec6765b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index a49e54c..f9532a6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -21,15 +21,26 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 public class PluginUtilsTest {
     @Rule
     public TemporaryFolder rootDir = new TemporaryFolder();
+    private Path pluginPath;
 
     @Before
     public void setUp() throws Exception {
+        pluginPath = rootDir.newFolder("plugins").toPath().toRealPath();
     }
 
     @Test
@@ -126,4 +137,134 @@ public class PluginUtilsTest {
                 "org.apache.kafka.connect.storage.StringConverter")
         );
     }
+
+    @Test
+    public void testEmptyPluginUrls() throws Exception {
+        assertEquals(Collections.<Path>emptyList(), 
PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testEmptyStructurePluginUrls() throws Exception {
+        createBasicDirectoryLayout();
+        assertEquals(Collections.<Path>emptyList(), 
PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testPluginUrlsWithJars() throws Exception {
+        createBasicDirectoryLayout();
+
+        List<Path> expectedUrls = createBasicExpectedUrls();
+
+        assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testPluginUrlsWithZips() throws Exception {
+        createBasicDirectoryLayout();
+
+        List<Path> expectedUrls = new ArrayList<>();
+        
expectedUrls.add(Files.createFile(pluginPath.resolve("connectorA/my-sink.zip")));
+        
expectedUrls.add(Files.createFile(pluginPath.resolve("connectorB/a-source.zip")));
+        
expectedUrls.add(Files.createFile(pluginPath.resolve("transformC/simple-transform.zip")));
+        expectedUrls.add(Files.createFile(
+                pluginPath.resolve("transformC/deps/another-transform.zip"))
+        );
+
+        assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testPluginUrlsWithClasses() throws Exception {
+        
Files.createDirectories(pluginPath.resolve("org/apache/kafka/converters"));
+        
Files.createDirectories(pluginPath.resolve("com/mycompany/transforms"));
+        Files.createDirectories(pluginPath.resolve("edu/research/connectors"));
+        
Files.createFile(pluginPath.resolve("org/apache/kafka/converters/README.txt"));
+        
Files.createFile(pluginPath.resolve("org/apache/kafka/converters/AlienFormat.class"));
+        
Files.createDirectories(pluginPath.resolve("com/mycompany/transforms/Blackhole.class"));
+        
Files.createDirectories(pluginPath.resolve("edu/research/connectors/HalSink.class"));
+
+        List<Path> expectedUrls = new ArrayList<>();
+        expectedUrls.add(pluginPath);
+
+        assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testPluginUrlsWithAbsoluteSymlink() throws Exception {
+        createBasicDirectoryLayout();
+
+        Path anotherPath = 
rootDir.newFolder("moreplugins").toPath().toRealPath();
+        Files.createDirectories(anotherPath.resolve("connectorB-deps"));
+        Files.createSymbolicLink(
+                pluginPath.resolve("connectorB/deps/symlink"),
+                anotherPath.resolve("connectorB-deps")
+        );
+
+        List<Path> expectedUrls = createBasicExpectedUrls();
+        
expectedUrls.add(Files.createFile(anotherPath.resolve("connectorB-deps/converter.jar")));
+
+        assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testPluginUrlsWithRelativeSymlinkBackwards() throws Exception {
+        createBasicDirectoryLayout();
+
+        Path anotherPath = 
rootDir.newFolder("moreplugins").toPath().toRealPath();
+        Files.createDirectories(anotherPath.resolve("connectorB-deps"));
+        Files.createSymbolicLink(
+                pluginPath.resolve("connectorB/deps/symlink"),
+                Paths.get("../../../moreplugins/connectorB-deps")
+        );
+
+        List<Path> expectedUrls = createBasicExpectedUrls();
+        
expectedUrls.add(Files.createFile(anotherPath.resolve("connectorB-deps/converter.jar")));
+
+        assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
+    }
+
+    @Test
+    public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception {
+        // Since this test case defines a relative symlink within an already 
included path, the main
+        // assertion of this test is absence of exceptions and correct 
resolution of paths.
+        createBasicDirectoryLayout();
+        Files.createDirectories(pluginPath.resolve("connectorB/deps/more"));
+        Files.createSymbolicLink(
+                pluginPath.resolve("connectorB/deps/symlink"),
+                Paths.get("more")
+        );
+
+        List<Path> expectedUrls = createBasicExpectedUrls();
+        expectedUrls.add(
+                
Files.createFile(pluginPath.resolve("connectorB/deps/more/converter.jar"))
+        );
+
+        assertUrls(expectedUrls, PluginUtils.pluginUrls(pluginPath));
+    }
+
+    private void createBasicDirectoryLayout() throws IOException {
+        Files.createDirectories(pluginPath.resolve("connectorA"));
+        Files.createDirectories(pluginPath.resolve("connectorB/deps"));
+        Files.createDirectories(pluginPath.resolve("transformC/deps"));
+        Files.createDirectories(pluginPath.resolve("transformC/more-deps"));
+        
Files.createFile(pluginPath.resolve("transformC/more-deps/README.txt"));
+    }
+
+    private List<Path> createBasicExpectedUrls() throws IOException {
+        List<Path> expectedUrls = new ArrayList<>();
+        
expectedUrls.add(Files.createFile(pluginPath.resolve("connectorA/my-sink.jar")));
+        
expectedUrls.add(Files.createFile(pluginPath.resolve("connectorB/a-source.jar")));
+        
expectedUrls.add(Files.createFile(pluginPath.resolve("transformC/simple-transform.jar")));
+        expectedUrls.add(Files.createFile(
+                pluginPath.resolve("transformC/deps/another-transform.jar"))
+        );
+        return expectedUrls;
+    }
+
+    private void assertUrls(List<Path> expected, List<Path> actual) {
+        List<Path> actualCopy = new ArrayList<>(actual);
+        Collections.sort(expected);
+        Collections.sort(actualCopy);
+        assertEquals(expected, actualCopy);
+    }
 }

Reply via email to