This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c4953c4c5ff583d88f1686cd96fd7e7be9d8f11 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue Apr 9 10:37:03 2019 +0200 [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink The mechanism uses child-first classloading and creates classloaders from jars that are discovered from a directory hierarchy. --- .../apache/flink/core/fs/FileSystemFactory.java | 12 +- .../flink/core/fs/UnsupportedSchemeFactory.java | 6 - .../core/fs/local/LocalFileSystemFactory.java | 6 - .../core/plugin/DirectoryBasedPluginFinder.java | 103 ++++++++++++++++ .../Plugin.java} | 33 ++--- .../apache/flink/core/plugin/PluginDescriptor.java | 67 ++++++++++ .../org/apache/flink/core/plugin/PluginFinder.java | 37 ++++++ .../org/apache/flink/core/plugin/PluginLoader.java | 94 ++++++++++++++ .../apache/flink/core/plugin/PluginManager.java | 77 ++++++++++++ .../org/apache/flink/core/plugin/PluginUtils.java | 54 ++++++++ .../TemporaryClassLoaderContext.java} | 33 ++--- .../plugin/DirectoryBasedPluginFinderTest.java | 137 +++++++++++++++++++++ .../plugin/TemporaryClassLoaderContextTest.java | 46 +++++++ .../testutils/EntropyInjectingTestFileSystem.java | 5 - .../org/apache/flink/testutils/TestFileSystem.java | 10 +- .../flink/runtime/fs/maprfs/MapRFsFactory.java | 6 - flink-tests/pom.xml | 31 +++++ .../src/test/assembly/test-plugin-a-assembly.xml | 43 +++++++ .../src/test/assembly/test-plugin-b-assembly.xml | 43 +++++++ .../org/apache/flink/test/plugin/OtherTestSpi.java | 28 +++++ .../apache/flink/test/plugin/PluginLoaderTest.java | 71 +++++++++++ .../flink/test/plugin/PluginManagerTest.java | 105 ++++++++++++++++ .../apache/flink/test/plugin/PluginTestBase.java | 54 ++++++++ .../java/org/apache/flink/test/plugin/TestSpi.java | 28 +++++ .../test/plugin/jar/plugina/DynamicClassA.java | 31 ++--- .../test/plugin/jar/plugina/TestServiceA.java | 34 +++-- .../test/plugin/jar/pluginb/OtherTestServiceB.java | 30 ++--- .../test/plugin/jar/pluginb/TestServiceB.java | 30 ++--- .../plugin-a/org.apache.flink.test.plugin.TestSpi | 16 +++ .../org.apache.flink.test.plugin.OtherTestSpi | 16 +++ .../plugin-b/org.apache.flink.test.plugin.TestSpi | 16 +++ 31 files changed, 1130 insertions(+), 172 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java index 8a35471..eecf6f1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.Plugin; import java.io.IOException; import java.net.URI; @@ -31,7 +32,7 @@ import java.net.URI; * creating file systems via {@link #create(URI)}. */ @PublicEvolving -public interface FileSystemFactory { +public interface FileSystemFactory extends Plugin { /** * Gets the scheme of the file system created by this factory. @@ -39,15 +40,6 @@ public interface FileSystemFactory { String getScheme(); /** - * Applies the given configuration to this factory. All future file system - * instantiations via {@link #create(URI)} should take the configuration into - * account. - * - * @param config The configuration to apply. - */ - void configure(Configuration config); - - /** * Creates a new file system for the given file system URI. * The URI describes the type of file system (via its scheme) and optionally the * authority (for example the host) of the file system. diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java index c2cb2d5..e873e63 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; import javax.annotation.Nullable; @@ -54,11 +53,6 @@ class UnsupportedSchemeFactory implements FileSystemFactory { } @Override - public void configure(Configuration config) { - // nothing to do here - } - - @Override public FileSystem create(URI fsUri) throws IOException { if (exceptionCause == null) { throw new UnsupportedFileSystemSchemeException(exceptionMessage); diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java index 785391a..5c3b7fb 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.core.fs.local; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; @@ -37,11 +36,6 @@ public class LocalFileSystemFactory implements FileSystemFactory { } @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here - } - - @Override public FileSystem create(URI fsUri) { return LocalFileSystem.getSharedInstance(); } diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java b/flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java new file mode 100644 index 0000000..5472c48 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.util.function.FunctionUtils; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.stream.Collectors; + +/** + * This class is used to create a collection of {@link PluginDescriptor} based on directory structure for a given plugin + * root folder. + * + * <p>The expected structure is as follows: the given plugins root folder, containing the plugins folder. One plugin folder + * contains all resources (jar files) belonging to a plugin. The name of the plugin folder becomes the plugin id. + * <pre> + * plugins-root-folder/ + * |------------plugin-a/ (folder of plugin a) + * | |-plugin-a-1.jar (the jars containing the classes of plugin a) + * | |-plugin-a-2.jar + * | |-... + * | + * |------------plugin-b/ + * | |-plugin-b-1.jar + * ... |-... + * </pre> + */ +public class DirectoryBasedPluginFinder implements PluginFinder { + + /** Pattern to match jar files in a directory. */ + private static final String JAR_MATCHER_PATTERN = "glob:**.jar"; + + /** Root directory to the plugin folders. */ + private final Path pluginsRootDir; + + /** Matcher for jar files in the filesystem of the root folder. */ + private final PathMatcher jarFileMatcher; + + public DirectoryBasedPluginFinder(Path pluginsRootDir) { + this.pluginsRootDir = pluginsRootDir; + this.jarFileMatcher = pluginsRootDir.getFileSystem().getPathMatcher(JAR_MATCHER_PATTERN); + } + + @Override + public Collection<PluginDescriptor> findPlugins() throws IOException { + + if (!Files.isDirectory(pluginsRootDir)) { + throw new IOException("Plugins root directory [" + pluginsRootDir + "] does not exist!"); + } + + return Files.list(pluginsRootDir) + .filter((Path path) -> Files.isDirectory(path)) + .map(FunctionUtils.uncheckedFunction(this::createPluginDescriptorForSubDirectory)) + .collect(Collectors.toList()); + } + + private PluginDescriptor createPluginDescriptorForSubDirectory(Path subDirectory) throws IOException { + URL[] urls = createJarURLsFromDirectory(subDirectory); + Arrays.sort(urls, Comparator.comparing(URL::toString)); + //TODO: This class could be extended to parse exclude-pattern from a optional text files in the plugin directories. + return new PluginDescriptor( + subDirectory.getFileName().toString(), + urls, + new String[0]); + } + + private URL[] createJarURLsFromDirectory(Path subDirectory) throws IOException { + URL[] urls = Files.list(subDirectory) + .filter((Path p) -> Files.isRegularFile(p) && jarFileMatcher.matches(p)) + .map(FunctionUtils.uncheckedFunction((Path p) -> p.toUri().toURL())) + .toArray(URL[]::new); + + if (urls.length < 1) { + throw new IOException("Cannot find any jar files for plugin in directory [" + subDirectory + "]." + + " Please provide the jar files for the plugin or delete the directory."); + } + + return urls; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java similarity index 57% copy from flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java copy to flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java index 785391a..0edf1b1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,33 +16,22 @@ * limitations under the License. */ -package org.apache.flink.core.fs.local; +package org.apache.flink.core.plugin; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; - -import java.net.URI; /** - * A factory for the {@link LocalFileSystem}. + * Interface for plugins. Plugins typically extend this interface in their SPI and the concrete implementations of a + * service then implement the SPI contract. */ @PublicEvolving -public class LocalFileSystemFactory implements FileSystemFactory { - - @Override - public String getScheme() { - return LocalFileSystem.getLocalFsURI().getScheme(); - } - - @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here - } +public interface Plugin { - @Override - public FileSystem create(URI fsUri) { - return LocalFileSystem.getSharedInstance(); - } + /** + * Optional method for plugins to pick up settings from the configuration. + * + * @param config The configuration to apply to the plugin. + */ + default void configure(Configuration config) {} } diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java new file mode 100644 index 0000000..93e01bf --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import java.net.URL; +import java.util.Arrays; + +/** + * Descriptive meta information for a plugin. + */ +public class PluginDescriptor { + + /** Unique identifier of the plugin. */ + private final String pluginId; + + /** URLs to the plugin resources code. Usually this contains URLs of the jars that will be loaded for the plugin. */ + private final URL[] pluginResourceURLs; + + /** + * String patterns of classes that should be excluded from loading out of the plugin resources. See + * {@link org.apache.flink.util.ChildFirstClassLoader}'s field alwaysParentFirstPatterns. + */ + private final String[] loaderExcludePatterns; + + public PluginDescriptor(String pluginId, URL[] pluginResourceURLs, String[] loaderExcludePatterns) { + this.pluginId = pluginId; + this.pluginResourceURLs = pluginResourceURLs; + this.loaderExcludePatterns = loaderExcludePatterns; + } + + public String getPluginId() { + return pluginId; + } + + public URL[] getPluginResourceURLs() { + return pluginResourceURLs; + } + + public String[] getLoaderExcludePatterns() { + return loaderExcludePatterns; + } + + @Override + public String toString() { + return "PluginDescriptor{" + + "pluginId='" + pluginId + '\'' + + ", pluginResourceURLs=" + Arrays.toString(pluginResourceURLs) + + ", loaderExcludePatterns=" + Arrays.toString(loaderExcludePatterns) + + '}'; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java new file mode 100644 index 0000000..45cb33f --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import java.io.IOException; +import java.util.Collection; + +/** + * Implementations of this interface provide mechanisms to locate plugins and create corresponding + * {@link PluginDescriptor} objects. The result can then be used to initialize a {@link PluginLoader}. + */ +public interface PluginFinder { + + /** + * Find plugins and return a corresponding collection of {@link PluginDescriptor} instances. + * + * @return a collection of {@link PluginDescriptor} instances for all found plugins. + * @throws IOException thrown if a problem occurs during plugin search. + */ + Collection<PluginDescriptor> findPlugins() throws IOException; +} diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java new file mode 100644 index 0000000..70468bf --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.ChildFirstClassLoader; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Iterator; +import java.util.ServiceLoader; + +/** + * A {@link PluginLoader} is used by the {@link PluginManager} to load a single plugin. It is essentially a combination + * of a {@link ChildFirstClassLoader} and {@link ServiceLoader}. This class can locate and load service implementations + * from the plugin for a given SPI. The {@link PluginDescriptor}, which among other information contains the resource + * URLs, is provided at construction. + */ +@ThreadSafe +public class PluginLoader { + + /** Classloader which is used to load the plugin classes. We expect this classloader is thread-safe.*/ + private final ClassLoader pluginClassLoader; + + @VisibleForTesting + public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader parentClassLoader) { + this.pluginClassLoader = + new ChildFirstClassLoader( + pluginDescriptor.getPluginResourceURLs(), + parentClassLoader, + pluginDescriptor.getLoaderExcludePatterns()); + } + + /** + * Returns in iterator over all available implementations of the given service interface (SPI) for the plugin. + * + * @param service the service interface (SPI) for which implementations are requested. + * @param <P> Type of the requested plugin service. + * @return An iterator of all implementations of the given service interface that could be loaded from the plugin. + */ + public <P extends Plugin> Iterator<P> load(Class<P> service) { + try (TemporaryClassLoaderContext classLoaderContext = new TemporaryClassLoaderContext(pluginClassLoader)) { + return new ContextClassLoaderSettingIterator<>( + ServiceLoader.load(service, pluginClassLoader).iterator(), + pluginClassLoader); + } + } + + /** + * Wrapper for the service iterator. The wrapper will set/unset the context classloader to the plugin classloader + * around the point where elements are returned. + * + * @param <P> type of the iterated plugin element. + */ + static class ContextClassLoaderSettingIterator<P extends Plugin> implements Iterator<P> { + + private final Iterator<P> delegate; + private final ClassLoader pluginClassLoader; + + ContextClassLoaderSettingIterator(Iterator<P> delegate, ClassLoader pluginClassLoader) { + this.delegate = delegate; + this.pluginClassLoader = pluginClassLoader; + } + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public P next() { + try (TemporaryClassLoaderContext classLoaderContext = new TemporaryClassLoaderContext(pluginClassLoader)) { + return delegate.next(); + } + } + } + +} diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java new file mode 100644 index 0000000..00971fa --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.annotation.Internal; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; + +/** + * Manager class and entry-point for the plugin mechanism in Flink. + */ +@Internal +@ThreadSafe +public class PluginManager { + + /** Parent-classloader to all classloader that are used for plugin loading. We expect that this is thread-safe. */ + private final ClassLoader parentClassLoader; + + /** A collection of descriptions of all plugins known to this plugin manager. */ + private final Collection<PluginDescriptor> pluginDescriptors; + + public PluginManager(Collection<PluginDescriptor> pluginDescriptors) { + this(pluginDescriptors, PluginManager.class.getClassLoader()); + } + + public PluginManager(Collection<PluginDescriptor> pluginDescriptors, ClassLoader parentClassLoader) { + this.pluginDescriptors = pluginDescriptors; + this.parentClassLoader = parentClassLoader; + } + + /** + * Returns in iterator over all available implementations of the given service interface (SPI) in all the plugins + * known to this plugin manager instance. + * + * @param service the service interface (SPI) for which implementations are requested. + * @param <P> Type of the requested plugin service. + * @return Iterator over all implementations of the given service that could be loaded from all known plugins. + */ + public <P extends Plugin> Iterator<P> load(Class<P> service) { + ArrayList<Iterator<P>> combinedIterators = new ArrayList<>(pluginDescriptors.size()); + for (PluginDescriptor pluginDescriptor : pluginDescriptors) { + PluginLoader pluginLoader = new PluginLoader(pluginDescriptor, parentClassLoader); + combinedIterators.add(pluginLoader.load(service)); + } + return Iterators.concat(combinedIterators.iterator()); + } + + @Override + public String toString() { + return "PluginManager{" + + "parentClassLoader=" + parentClassLoader + + ", pluginDescriptors=" + pluginDescriptors + + '}'; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java new file mode 100644 index 0000000..17e2ae4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; + +/** + * Utility functions for the plugin mechanism. + */ +public final class PluginUtils { + + private PluginUtils() { + throw new AssertionError("Singleton class."); + } + + public static PluginManager createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) { + Collection<PluginDescriptor> pluginDescriptorsForDirectory; + + if (pluginsRootPath.isPresent()) { + try { + pluginDescriptorsForDirectory = + new DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins(); + } catch (IOException e) { + throw new FlinkRuntimeException("Exception when trying to initialize plugin system.", e); + } + } else { + pluginDescriptorsForDirectory = Collections.emptyList(); + } + + return new PluginManager(pluginDescriptorsForDirectory); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/plugin/TemporaryClassLoaderContext.java similarity index 51% copy from flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java copy to flink-core/src/main/java/org/apache/flink/core/plugin/TemporaryClassLoaderContext.java index 785391a..eeadae8 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/plugin/TemporaryClassLoaderContext.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,33 +16,24 @@ * limitations under the License. */ -package org.apache.flink.core.fs.local; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; - -import java.net.URI; +package org.apache.flink.core.plugin; /** - * A factory for the {@link LocalFileSystem}. + * Utility class to temporarily change the context classloader. The previous context classloader is restored on + * {@link #close()}. */ -@PublicEvolving -public class LocalFileSystemFactory implements FileSystemFactory { +public final class TemporaryClassLoaderContext implements AutoCloseable { - @Override - public String getScheme() { - return LocalFileSystem.getLocalFsURI().getScheme(); - } + /** The previous context class loader to restore on {@link #close()}. */ + private final ClassLoader toRestore; - @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here + public TemporaryClassLoaderContext(ClassLoader temporaryContextClassLoader) { + this.toRestore = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(temporaryContextClassLoader); } @Override - public FileSystem create(URI fsUri) { - return LocalFileSystem.getSharedInstance(); + public void close() { + Thread.currentThread().setContextClassLoader(toRestore); } } diff --git a/flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinderTest.java b/flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinderTest.java new file mode 100644 index 0000000..6688e41 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinderTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.util.Preconditions; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.fail; + +/** + * Test for {@link DirectoryBasedPluginFinder}. + */ +public class DirectoryBasedPluginFinderTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void createPluginDescriptorsForDirectory() throws Exception { + File rootFolder = temporaryFolder.newFolder(); + PluginFinder descriptorsFactory = + new DirectoryBasedPluginFinder(rootFolder.toPath()); + Collection<PluginDescriptor> actual = descriptorsFactory.findPlugins(); + + Assert.assertTrue("empty root dir -> expected no actual", actual.isEmpty()); + + List<File> subDirs = Stream.of("A", "B", "C") + .map(s -> new File(rootFolder, s)) + .collect(Collectors.toList()); + + for (File subDir : subDirs) { + Preconditions.checkState(subDir.mkdirs()); + } + + try { + descriptorsFactory.findPlugins(); + fail("all empty plugin sub-dirs"); + } catch (RuntimeException expected) { + Assert.assertTrue(expected.getCause() instanceof IOException); + } + + for (File subDir : subDirs) { + // we create a file and another subfolder to check that they are ignored + Preconditions.checkState(new File(subDir, "ignore-test.zip").createNewFile()); + Preconditions.checkState(new File(subDir, "ignore-dir").mkdirs()); + } + + try { + descriptorsFactory.findPlugins(); + fail("still no jars in plugin sub-dirs"); + } catch (RuntimeException expected) { + Assert.assertTrue(expected.getCause() instanceof IOException); + } + + List<PluginDescriptor> expected = new ArrayList<>(3); + + for (int i = 0; i < subDirs.size(); ++i) { + File subDir = subDirs.get(i); + URL[] jarURLs = new URL[i + 1]; + + for (int j = 0; j <= i; ++j) { + File file = new File(subDir, "jar-file-" + j + ".jar"); + Preconditions.checkState(file.createNewFile()); + jarURLs[j] = file.toURI().toURL(); + } + + Arrays.sort(jarURLs, Comparator.comparing(URL::toString)); + expected.add(new PluginDescriptor(subDir.getName(), jarURLs, new String[0])); + } + + actual = descriptorsFactory.findPlugins(); + + Assert.assertTrue(equalsIgnoreOrder(expected, new ArrayList<>(actual))); + } + + private boolean equalsIgnoreOrder(List<PluginDescriptor> a, List<PluginDescriptor> b) { + + if (a.size() != b.size()) { + return false; + } + + final Comparator<PluginDescriptor> comparator = Comparator.comparing(PluginDescriptor::getPluginId); + + a.sort(comparator); + b.sort(comparator); + + final Iterator<PluginDescriptor> iterA = a.iterator(); + final Iterator<PluginDescriptor> iterB = b.iterator(); + + while (iterA.hasNext()) { + if (!equals(iterA.next(), iterB.next())) { + return false; + } + } + return true; + } + + private static boolean equals(@Nonnull PluginDescriptor a, @Nonnull PluginDescriptor b) { + return a.getPluginId().equals(b.getPluginId()) + && Arrays.deepEquals(a.getPluginResourceURLs(), b.getPluginResourceURLs()) + && Arrays.deepEquals(a.getLoaderExcludePatterns(), b.getLoaderExcludePatterns()); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/core/plugin/TemporaryClassLoaderContextTest.java b/flink-core/src/test/java/org/apache/flink/core/plugin/TemporaryClassLoaderContextTest.java new file mode 100644 index 0000000..1eaf369 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/plugin/TemporaryClassLoaderContextTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.plugin; + +import org.apache.flink.util.ChildFirstClassLoader; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URL; + +/** + * Test for {@link TemporaryClassLoaderContext}. + */ +public class TemporaryClassLoaderContextTest { + + @Test + public void testTemporaryClassLoaderContext() { + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + + final ChildFirstClassLoader temporaryClassLoader = + new ChildFirstClassLoader(new URL[0], contextClassLoader, new String[0]); + + try (TemporaryClassLoaderContext classLoaderContext = new TemporaryClassLoaderContext(temporaryClassLoader)) { + Assert.assertEquals(temporaryClassLoader, Thread.currentThread().getContextClassLoader()); + } + + Assert.assertEquals(contextClassLoader, Thread.currentThread().getContextClassLoader()); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java index ef1575b..1df47e2 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java @@ -19,7 +19,6 @@ package org.apache.flink.testutils; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.EntropyInjectingFileSystem; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; @@ -54,10 +53,6 @@ public class EntropyInjectingTestFileSystem extends LocalFileSystem implements E } @Override - public void configure(final Configuration config) { - } - - @Override public FileSystem create(final URI fsUri) { return new EntropyInjectingTestFileSystem(); } diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java index b799152..6460c46 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java @@ -18,10 +18,6 @@ package org.apache.flink.testutils; -import java.io.IOException; -import java.net.URI; - -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; @@ -30,6 +26,9 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileStatus; import org.apache.flink.core.fs.local.LocalFileSystem; +import java.io.IOException; +import java.net.URI; + /** * A test file system. This also has a service entry in the test * resources, to be loaded during tests. @@ -91,9 +90,6 @@ public class TestFileSystem extends LocalFileSystem { } @Override - public void configure(Configuration config) {} - - @Override public FileSystem create(URI fsUri) throws IOException { return new TestFileSystem(); } diff --git a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java index e163f63..d738198 100644 --- a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java +++ b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.fs.maprfs; -import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; @@ -48,11 +47,6 @@ public class MapRFsFactory implements FileSystemFactory { } @Override - public void configure(Configuration config) { - // nothing to configure based on the configuration here - } - - @Override public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "fsUri"); diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 9fa652d..de1c205 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -412,6 +412,37 @@ under the License. <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version><!--$NO-MVN-MAN-VER$--> <executions> + + <execution> + <id>create-plugin-a-jar</id> + <phase>process-test-classes</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>plugin-a</finalName> + <attach>false</attach> + <descriptors> + <descriptor>src/test/assembly/test-plugin-a-assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + + <execution> + <id>create-plugin-b-jar</id> + <phase>process-test-classes</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>plugin-b</finalName> + <attach>false</attach> + <descriptors> + <descriptor>src/test/assembly/test-plugin-b-assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + <execution> <id>create-kmeans-jar</id> <phase>process-test-classes</phase> diff --git a/flink-tests/src/test/assembly/test-plugin-a-assembly.xml b/flink-tests/src/test/assembly/test-plugin-a-assembly.xml new file mode 100644 index 0000000..7b9c175 --- /dev/null +++ b/flink-tests/src/test/assembly/test-plugin-a-assembly.xml @@ -0,0 +1,43 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> + +<assembly> + <id>test-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.testOutputDirectory}</directory> + <outputDirectory>/</outputDirectory> + <!-- the service impl --> + <includes> + <include>org/apache/flink/test/plugin/jar/plugina/TestServiceA.class</include> + <include>org/apache/flink/test/plugin/jar/plugina/DynamicClassA.class</include> + </includes> + </fileSet> + <fileSet> + <!-- declaring the service impl --> + <directory>${project.basedir}/src/test/resources/plugin-test/plugin-a</directory> + <outputDirectory>/META-INF/services</outputDirectory> + </fileSet> + </fileSets> +</assembly> diff --git a/flink-tests/src/test/assembly/test-plugin-b-assembly.xml b/flink-tests/src/test/assembly/test-plugin-b-assembly.xml new file mode 100644 index 0000000..c15a1e4 --- /dev/null +++ b/flink-tests/src/test/assembly/test-plugin-b-assembly.xml @@ -0,0 +1,43 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> + +<assembly> + <id>test-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${project.build.testOutputDirectory}</directory> + <outputDirectory>/</outputDirectory> + <!-- the service impl --> + <includes> + <include>org/apache/flink/test/plugin/jar/pluginb/TestServiceB.class</include> + <include>org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.class</include> + </includes> + </fileSet> + <fileSet> + <!-- declaring the service impl --> + <directory>${project.basedir}/src/test/resources/plugin-test/plugin-b</directory> + <outputDirectory>/META-INF/services</outputDirectory> + </fileSet> + </fileSets> +</assembly> diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/OtherTestSpi.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/OtherTestSpi.java new file mode 100644 index 0000000..b65859d --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/OtherTestSpi.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.plugin; + +import org.apache.flink.core.plugin.Plugin; + +/** + * Another service interface for tests of plugin mechanism. + */ +public interface OtherTestSpi extends Plugin { + String otherTestMethod(); +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java new file mode 100644 index 0000000..2460ad9 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.plugin; + +import org.apache.flink.core.plugin.PluginDescriptor; +import org.apache.flink.core.plugin.PluginLoader; +import org.apache.flink.test.plugin.jar.plugina.TestServiceA; + +import org.junit.Assert; +import org.junit.Test; + +import java.net.URL; +import java.util.Iterator; + +/** + * Test for {@link PluginLoader}. + */ +public class PluginLoaderTest extends PluginTestBase{ + + @Test + public void testPluginLoading() throws Exception { + + final URL classpathA = createPluginJarURLFromString(PLUGIN_A); + + PluginDescriptor pluginDescriptorA = new PluginDescriptor("A", new URL[]{classpathA}, new String[0]); + final PluginLoader pluginLoaderA = new PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER); + + Iterator<TestSpi> testSpiIteratorA = pluginLoaderA.load(TestSpi.class); + + Assert.assertTrue(testSpiIteratorA.hasNext()); + + TestSpi testSpiA = testSpiIteratorA.next(); + + Assert.assertFalse(testSpiIteratorA.hasNext()); + + Assert.assertNotNull(testSpiA.testMethod()); + + Assert.assertEquals(TestServiceA.class.getCanonicalName(), testSpiA.getClass().getCanonicalName()); + Assert.assertNotEquals(PARENT_CLASS_LOADER, testSpiA.getClass().getClassLoader()); + + // Looks strange, but we want to ensure that those classes are not instance of each other because they were + // loaded by different classloader instances because the plugin loader uses child-before-parent order. + Assert.assertFalse(testSpiA instanceof TestServiceA); + + // In the following we check for isolation of classes between different plugin loaders. + final PluginLoader secondPluginLoaderA = new PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER); + + TestSpi secondTestSpiA = secondPluginLoaderA.load(TestSpi.class).next(); + Assert.assertNotNull(secondTestSpiA.testMethod()); + + // Again, this looks strange, but we expect classes with the same name, that are not equal. + Assert.assertEquals(testSpiA.getClass().getCanonicalName(), secondTestSpiA.getClass().getCanonicalName()); + Assert.assertNotEquals(testSpiA.getClass(), secondTestSpiA.getClass()); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java new file mode 100644 index 0000000..af5667c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.plugin; + +import org.apache.flink.core.plugin.DirectoryBasedPluginFinder; +import org.apache.flink.core.plugin.PluginDescriptor; +import org.apache.flink.core.plugin.PluginFinder; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; + +/** + * Test for {@link PluginManager}. + */ +public class PluginManagerTest extends PluginTestBase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Collection<PluginDescriptor> descriptors; + + @Before + public void setup() throws Exception { + /* + * We setup a plugin directory hierarchy and utilize DirectoryBasedPluginFinder to create the + * descriptors: + * + * <pre> + * tmp/plugins-root/ + * |-------------A/ + * | |-plugin-a.jar + * | + * |-------------B/ + * |-plugin-b.jar + * </pre> + */ + final File pluginRootFolder = temporaryFolder.newFolder(); + final Path pluginRootFolderPath = pluginRootFolder.toPath(); + final File pluginAFolder = new File(pluginRootFolder, "A"); + final File pluginBFolder = new File(pluginRootFolder, "B"); + Preconditions.checkState(pluginAFolder.mkdirs()); + Preconditions.checkState(pluginBFolder.mkdirs()); + Files.copy(locateJarFile(PLUGIN_A).toPath(), Paths.get(pluginAFolder.toString(), PLUGIN_A)); + Files.copy(locateJarFile(PLUGIN_B).toPath(), Paths.get(pluginBFolder.toString(), PLUGIN_B)); + final PluginFinder descriptorsFactory = new DirectoryBasedPluginFinder(pluginRootFolderPath); + descriptors = descriptorsFactory.findPlugins(); + Preconditions.checkState(descriptors.size() == 2); + } + + @Test + public void testPluginLoading() { + + final PluginManager pluginManager = new PluginManager(descriptors, PARENT_CLASS_LOADER); + final List<TestSpi> serviceImplList = Lists.newArrayList(pluginManager.load(TestSpi.class)); + Assert.assertEquals(2, serviceImplList.size()); + + // check that all impl have unique classloader + final Set<ClassLoader> classLoaders = Collections.newSetFromMap(new IdentityHashMap<>(3)); + classLoaders.add(PARENT_CLASS_LOADER); + for (TestSpi testSpi : serviceImplList) { + Assert.assertNotNull(testSpi.testMethod()); + Assert.assertTrue(classLoaders.add(testSpi.getClass().getClassLoader())); + } + + final List<OtherTestSpi> otherServiceImplList = Lists.newArrayList(pluginManager.load(OtherTestSpi.class)); + Assert.assertEquals(1, otherServiceImplList.size()); + for (OtherTestSpi otherTestSpi : otherServiceImplList) { + Assert.assertNotNull(otherTestSpi.otherTestMethod()); + Assert.assertTrue(classLoaders.add(otherTestSpi.getClass().getClassLoader())); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginTestBase.java new file mode 100644 index 0000000..7f1afe6 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginTestBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.plugin; + +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Base class for some tests related to the plugin mechanism. Provides access to some common test resources. + */ +abstract class PluginTestBase extends TestLogger { + + /** Optional prefix to the jar folder if run from an IDE. */ + private static final String OPT_PREFIX = "target/"; + + static final String PLUGIN_A = "plugin-a-test-jar.jar"; + static final String PLUGIN_B = "plugin-b-test-jar.jar"; + static final ClassLoader PARENT_CLASS_LOADER = PluginTestBase.class.getClassLoader(); + + URL createPluginJarURLFromString(String fileString) throws MalformedURLException { + File file = locateJarFile(fileString); + return file.toURI().toURL(); + } + + static File locateJarFile(String fileString) { + File file = new File(fileString); + if (!file.exists()) { + file = new File(OPT_PREFIX + fileString); + } + Preconditions.checkState(file.exists(), "Unable to locate jar file for test: " + fileString); + return file; + } + +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/plugin/TestSpi.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/TestSpi.java new file mode 100644 index 0000000..550108f --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/TestSpi.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.plugin; + +import org.apache.flink.core.plugin.Plugin; + +/** + * Service interface for tests of plugin mechanism. + */ +public interface TestSpi extends Plugin { + String testMethod(); +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/DynamicClassA.java similarity index 50% copy from flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java copy to flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/DynamicClassA.java index 785391a..d4b5716 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/DynamicClassA.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,33 +16,18 @@ * limitations under the License. */ -package org.apache.flink.core.fs.local; +package org.apache.flink.test.plugin.jar.plugina; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; - -import java.net.URI; +import org.apache.flink.test.plugin.TestSpi; /** - * A factory for the {@link LocalFileSystem}. + * This class exists in the test to validate that dynamic classloading ({@link Class#forName(String)} works inside of + * plugin code. */ -@PublicEvolving -public class LocalFileSystemFactory implements FileSystemFactory { - - @Override - public String getScheme() { - return LocalFileSystem.getLocalFsURI().getScheme(); - } - - @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here - } +public class DynamicClassA implements TestSpi { @Override - public FileSystem create(URI fsUri) { - return LocalFileSystem.getSharedInstance(); + public String testMethod() { + return getClass().getName(); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/TestServiceA.java similarity index 51% copy from flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java copy to flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/TestServiceA.java index 785391a..0026773 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/TestServiceA.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,33 +16,27 @@ * limitations under the License. */ -package org.apache.flink.core.fs.local; +package org.apache.flink.test.plugin.jar.plugina; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; - -import java.net.URI; +import org.apache.flink.test.plugin.TestSpi; /** - * A factory for the {@link LocalFileSystem}. + * First implementation of {@link TestSpi}. */ -@PublicEvolving -public class LocalFileSystemFactory implements FileSystemFactory { +public class TestServiceA implements TestSpi { - @Override - public String getScheme() { - return LocalFileSystem.getLocalFsURI().getScheme(); - } + private final TestSpi dynamicDelegate; - @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here + public TestServiceA() { + try { + dynamicDelegate = (TestSpi) Class.forName(DynamicClassA.class.getName()).newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Unable to load dynamic class."); + } } @Override - public FileSystem create(URI fsUri) { - return LocalFileSystem.getSharedInstance(); + public String testMethod() { + return getClass().getName() + "(" + dynamicDelegate.testMethod() + ")"; } } diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.java similarity index 50% copy from flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java copy to flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.java index 785391a..e69af1c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,33 +16,17 @@ * limitations under the License. */ -package org.apache.flink.core.fs.local; +package org.apache.flink.test.plugin.jar.pluginb; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; - -import java.net.URI; +import org.apache.flink.test.plugin.OtherTestSpi; /** - * A factory for the {@link LocalFileSystem}. + * Implementation of {@link OtherTestSpi}. */ -@PublicEvolving -public class LocalFileSystemFactory implements FileSystemFactory { - - @Override - public String getScheme() { - return LocalFileSystem.getLocalFsURI().getScheme(); - } - - @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here - } +public class OtherTestServiceB implements OtherTestSpi { @Override - public FileSystem create(URI fsUri) { - return LocalFileSystem.getSharedInstance(); + public String otherTestMethod() { + return getClass().getName(); } } diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/TestServiceB.java similarity index 50% copy from flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java copy to flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/TestServiceB.java index 785391a..efd9194 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java +++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/TestServiceB.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,33 +16,17 @@ * limitations under the License. */ -package org.apache.flink.core.fs.local; +package org.apache.flink.test.plugin.jar.pluginb; -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.FileSystemFactory; - -import java.net.URI; +import org.apache.flink.test.plugin.TestSpi; /** - * A factory for the {@link LocalFileSystem}. + * Second implementation of {@link TestSpi}. */ -@PublicEvolving -public class LocalFileSystemFactory implements FileSystemFactory { - - @Override - public String getScheme() { - return LocalFileSystem.getLocalFsURI().getScheme(); - } - - @Override - public void configure(Configuration config) { - // the local file system takes no configuration, so nothing to do here - } +public class TestServiceB implements TestSpi { @Override - public FileSystem create(URI fsUri) { - return LocalFileSystem.getSharedInstance(); + public String testMethod() { + return getClass().getName(); } } diff --git a/flink-tests/src/test/resources/plugin-test/plugin-a/org.apache.flink.test.plugin.TestSpi b/flink-tests/src/test/resources/plugin-test/plugin-a/org.apache.flink.test.plugin.TestSpi new file mode 100644 index 0000000..6d6b1de --- /dev/null +++ b/flink-tests/src/test/resources/plugin-test/plugin-a/org.apache.flink.test.plugin.TestSpi @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.test.plugin.jar.plugina.TestServiceA diff --git a/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.OtherTestSpi b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.OtherTestSpi new file mode 100644 index 0000000..341c830 --- /dev/null +++ b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.OtherTestSpi @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.test.plugin.jar.pluginb.OtherTestServiceB diff --git a/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.TestSpi b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.TestSpi new file mode 100644 index 0000000..159a3c6 --- /dev/null +++ b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.TestSpi @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.test.plugin.jar.pluginb.TestServiceB