This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c4953c4c5ff583d88f1686cd96fd7e7be9d8f11
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Apr 9 10:37:03 2019 +0200

    [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink
    
    The mechanism uses child-first classloading and creates classloaders from 
jars that are discovered
    from a directory hierarchy.
---
 .../apache/flink/core/fs/FileSystemFactory.java    |  12 +-
 .../flink/core/fs/UnsupportedSchemeFactory.java    |   6 -
 .../core/fs/local/LocalFileSystemFactory.java      |   6 -
 .../core/plugin/DirectoryBasedPluginFinder.java    | 103 ++++++++++++++++
 .../Plugin.java}                                   |  33 ++---
 .../apache/flink/core/plugin/PluginDescriptor.java |  67 ++++++++++
 .../org/apache/flink/core/plugin/PluginFinder.java |  37 ++++++
 .../org/apache/flink/core/plugin/PluginLoader.java |  94 ++++++++++++++
 .../apache/flink/core/plugin/PluginManager.java    |  77 ++++++++++++
 .../org/apache/flink/core/plugin/PluginUtils.java  |  54 ++++++++
 .../TemporaryClassLoaderContext.java}              |  33 ++---
 .../plugin/DirectoryBasedPluginFinderTest.java     | 137 +++++++++++++++++++++
 .../plugin/TemporaryClassLoaderContextTest.java    |  46 +++++++
 .../testutils/EntropyInjectingTestFileSystem.java  |   5 -
 .../org/apache/flink/testutils/TestFileSystem.java |  10 +-
 .../flink/runtime/fs/maprfs/MapRFsFactory.java     |   6 -
 flink-tests/pom.xml                                |  31 +++++
 .../src/test/assembly/test-plugin-a-assembly.xml   |  43 +++++++
 .../src/test/assembly/test-plugin-b-assembly.xml   |  43 +++++++
 .../org/apache/flink/test/plugin/OtherTestSpi.java |  28 +++++
 .../apache/flink/test/plugin/PluginLoaderTest.java |  71 +++++++++++
 .../flink/test/plugin/PluginManagerTest.java       | 105 ++++++++++++++++
 .../apache/flink/test/plugin/PluginTestBase.java   |  54 ++++++++
 .../java/org/apache/flink/test/plugin/TestSpi.java |  28 +++++
 .../test/plugin/jar/plugina/DynamicClassA.java     |  31 ++---
 .../test/plugin/jar/plugina/TestServiceA.java      |  34 +++--
 .../test/plugin/jar/pluginb/OtherTestServiceB.java |  30 ++---
 .../test/plugin/jar/pluginb/TestServiceB.java      |  30 ++---
 .../plugin-a/org.apache.flink.test.plugin.TestSpi  |  16 +++
 .../org.apache.flink.test.plugin.OtherTestSpi      |  16 +++
 .../plugin-b/org.apache.flink.test.plugin.TestSpi  |  16 +++
 31 files changed, 1130 insertions(+), 172 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
index 8a35471..eecf6f1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.Plugin;
 
 import java.io.IOException;
 import java.net.URI;
@@ -31,7 +32,7 @@ import java.net.URI;
  * creating file systems via {@link #create(URI)}.
  */
 @PublicEvolving
-public interface FileSystemFactory {
+public interface FileSystemFactory extends Plugin {
 
        /**
         * Gets the scheme of the file system created by this factory.
@@ -39,15 +40,6 @@ public interface FileSystemFactory {
        String getScheme();
 
        /**
-        * Applies the given configuration to this factory. All future file 
system
-        * instantiations via {@link #create(URI)} should take the 
configuration into
-        * account.
-        *
-        * @param config The configuration to apply.
-        */
-       void configure(Configuration config);
-
-       /**
         * Creates a new file system for the given file system URI.
         * The URI describes the type of file system (via its scheme) and 
optionally the
         * authority (for example the host) of the file system.
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
index c2cb2d5..e873e63 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nullable;
 
@@ -54,11 +53,6 @@ class UnsupportedSchemeFactory implements FileSystemFactory {
        }
 
        @Override
-       public void configure(Configuration config) {
-               // nothing to do here
-       }
-
-       @Override
        public FileSystem create(URI fsUri) throws IOException {
                if (exceptionCause == null) {
                        throw new 
UnsupportedFileSystemSchemeException(exceptionMessage);
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
index 785391a..5c3b7fb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 
@@ -37,11 +36,6 @@ public class LocalFileSystemFactory implements 
FileSystemFactory {
        }
 
        @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
-       }
-
-       @Override
        public FileSystem create(URI fsUri) {
                return LocalFileSystem.getSharedInstance();
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 
b/flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
new file mode 100644
index 0000000..5472c48
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * <p>The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * <pre>
+ * plugins-root-folder/
+ *            |------------plugin-a/ (folder of plugin a)
+ *            |                |-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *            |                |-plugin-a-2.jar
+ *            |                |-...
+ *            |
+ *            |------------plugin-b/
+ *            |                |-plugin-b-1.jar
+ *           ...               |-...
+ * </pre>
+ */
+public class DirectoryBasedPluginFinder implements PluginFinder {
+
+       /** Pattern to match jar files in a directory. */
+       private static final String JAR_MATCHER_PATTERN = "glob:**.jar";
+
+       /** Root directory to the plugin folders. */
+       private final Path pluginsRootDir;
+
+       /** Matcher for jar files in the filesystem of the root folder. */
+       private final PathMatcher jarFileMatcher;
+
+       public DirectoryBasedPluginFinder(Path pluginsRootDir) {
+               this.pluginsRootDir = pluginsRootDir;
+               this.jarFileMatcher = 
pluginsRootDir.getFileSystem().getPathMatcher(JAR_MATCHER_PATTERN);
+       }
+
+       @Override
+       public Collection<PluginDescriptor> findPlugins() throws IOException {
+
+               if (!Files.isDirectory(pluginsRootDir)) {
+                       throw new IOException("Plugins root directory [" + 
pluginsRootDir + "] does not exist!");
+               }
+
+               return Files.list(pluginsRootDir)
+                       .filter((Path path) -> Files.isDirectory(path))
+                       
.map(FunctionUtils.uncheckedFunction(this::createPluginDescriptorForSubDirectory))
+                       .collect(Collectors.toList());
+       }
+
+       private PluginDescriptor createPluginDescriptorForSubDirectory(Path 
subDirectory) throws IOException {
+               URL[] urls = createJarURLsFromDirectory(subDirectory);
+               Arrays.sort(urls, Comparator.comparing(URL::toString));
+               //TODO: This class could be extended to parse exclude-pattern 
from a optional text files in the plugin directories.
+               return new PluginDescriptor(
+                               subDirectory.getFileName().toString(),
+                               urls,
+                               new String[0]);
+       }
+
+       private URL[] createJarURLsFromDirectory(Path subDirectory) throws 
IOException {
+               URL[] urls = Files.list(subDirectory)
+                       .filter((Path p) -> Files.isRegularFile(p) && 
jarFileMatcher.matches(p))
+                       .map(FunctionUtils.uncheckedFunction((Path p) -> 
p.toUri().toURL()))
+                       .toArray(URL[]::new);
+
+               if (urls.length < 1) {
+                       throw new IOException("Cannot find any jar files for 
plugin in directory [" + subDirectory + "]." +
+                               " Please provide the jar files for the plugin 
or delete the directory.");
+               }
+
+               return urls;
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 b/flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java
similarity index 57%
copy from 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
copy to flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java
index 785391a..0edf1b1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/Plugin.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,33 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.core.fs.local;
+package org.apache.flink.core.plugin;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.net.URI;
 
 /**
- * A factory for the {@link LocalFileSystem}.
+ * Interface for plugins. Plugins typically extend this interface in their SPI 
and the concrete implementations of a
+ * service then implement the SPI contract.
  */
 @PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
-
-       @Override
-       public String getScheme() {
-               return LocalFileSystem.getLocalFsURI().getScheme();
-       }
-
-       @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
-       }
+public interface Plugin {
 
-       @Override
-       public FileSystem create(URI fsUri) {
-               return LocalFileSystem.getSharedInstance();
-       }
+       /**
+        * Optional method for plugins to pick up settings from the 
configuration.
+        *
+        * @param config The configuration to apply to the plugin.
+        */
+       default void configure(Configuration config) {}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java
new file mode 100644
index 0000000..93e01bf
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import java.net.URL;
+import java.util.Arrays;
+
+/**
+ * Descriptive meta information for a plugin.
+ */
+public class PluginDescriptor {
+
+       /** Unique identifier of the plugin. */
+       private final String pluginId;
+
+       /** URLs to the plugin resources code. Usually this contains URLs of 
the jars that will be loaded for the plugin. */
+       private final URL[] pluginResourceURLs;
+
+       /**
+        * String patterns of classes that should be excluded from loading out 
of the plugin resources. See
+        * {@link org.apache.flink.util.ChildFirstClassLoader}'s field 
alwaysParentFirstPatterns.
+        */
+       private final String[] loaderExcludePatterns;
+
+       public PluginDescriptor(String pluginId, URL[] pluginResourceURLs, 
String[] loaderExcludePatterns) {
+               this.pluginId = pluginId;
+               this.pluginResourceURLs = pluginResourceURLs;
+               this.loaderExcludePatterns = loaderExcludePatterns;
+       }
+
+       public String getPluginId() {
+               return pluginId;
+       }
+
+       public URL[] getPluginResourceURLs() {
+               return pluginResourceURLs;
+       }
+
+       public String[] getLoaderExcludePatterns() {
+               return loaderExcludePatterns;
+       }
+
+       @Override
+       public String toString() {
+               return "PluginDescriptor{" +
+                       "pluginId='" + pluginId + '\'' +
+                       ", pluginResourceURLs=" + 
Arrays.toString(pluginResourceURLs) +
+                       ", loaderExcludePatterns=" + 
Arrays.toString(loaderExcludePatterns) +
+                       '}';
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java
new file mode 100644
index 0000000..45cb33f
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Implementations of this interface provide mechanisms to locate plugins and 
create corresponding
+ * {@link PluginDescriptor} objects. The result can then be used to initialize 
a {@link PluginLoader}.
+ */
+public interface PluginFinder {
+
+       /**
+        * Find plugins and return a corresponding collection of {@link 
PluginDescriptor} instances.
+        *
+        * @return a collection of {@link PluginDescriptor} instances for all 
found plugins.
+        * @throws IOException thrown if a problem occurs during plugin search.
+        */
+       Collection<PluginDescriptor> findPlugins() throws IOException;
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
new file mode 100644
index 0000000..70468bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ChildFirstClassLoader;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * A {@link PluginLoader} is used by the {@link PluginManager} to load a 
single plugin. It is essentially a combination
+ * of a {@link ChildFirstClassLoader} and {@link ServiceLoader}. This class 
can locate and load service implementations
+ * from the plugin for a given SPI. The {@link PluginDescriptor}, which among 
other information contains the resource
+ * URLs, is provided at construction.
+ */
+@ThreadSafe
+public class PluginLoader {
+
+       /** Classloader which is used to load the plugin classes. We expect 
this classloader is thread-safe.*/
+       private final ClassLoader pluginClassLoader;
+
+       @VisibleForTesting
+       public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader 
parentClassLoader) {
+               this.pluginClassLoader =
+                       new ChildFirstClassLoader(
+                               pluginDescriptor.getPluginResourceURLs(),
+                               parentClassLoader,
+                               pluginDescriptor.getLoaderExcludePatterns());
+       }
+
+       /**
+        * Returns in iterator over all available implementations of the given 
service interface (SPI) for the plugin.
+        *
+        * @param service the service interface (SPI) for which implementations 
are requested.
+        * @param <P> Type of the requested plugin service.
+        * @return An iterator of all implementations of the given service 
interface that could be loaded from the plugin.
+        */
+       public <P extends Plugin> Iterator<P> load(Class<P> service) {
+               try (TemporaryClassLoaderContext classLoaderContext = new 
TemporaryClassLoaderContext(pluginClassLoader)) {
+                       return new ContextClassLoaderSettingIterator<>(
+                               ServiceLoader.load(service, 
pluginClassLoader).iterator(),
+                               pluginClassLoader);
+               }
+       }
+
+       /**
+        * Wrapper for the service iterator. The wrapper will set/unset the 
context classloader to the plugin classloader
+        * around the point where elements are returned.
+        *
+        * @param <P> type of the iterated plugin element.
+        */
+       static class ContextClassLoaderSettingIterator<P extends Plugin> 
implements Iterator<P> {
+
+               private final Iterator<P> delegate;
+               private final ClassLoader pluginClassLoader;
+
+               ContextClassLoaderSettingIterator(Iterator<P> delegate, 
ClassLoader pluginClassLoader) {
+                       this.delegate = delegate;
+                       this.pluginClassLoader = pluginClassLoader;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return delegate.hasNext();
+               }
+
+               @Override
+               public P next() {
+                       try (TemporaryClassLoaderContext classLoaderContext = 
new TemporaryClassLoaderContext(pluginClassLoader)) {
+                               return delegate.next();
+                       }
+               }
+       }
+
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
new file mode 100644
index 0000000..00971fa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Manager class and entry-point for the plugin mechanism in Flink.
+ */
+@Internal
+@ThreadSafe
+public class PluginManager {
+
+       /** Parent-classloader to all classloader that are used for plugin 
loading. We expect that this is thread-safe. */
+       private final ClassLoader parentClassLoader;
+
+       /** A collection of descriptions of all plugins known to this plugin 
manager. */
+       private final Collection<PluginDescriptor> pluginDescriptors;
+
+       public PluginManager(Collection<PluginDescriptor> pluginDescriptors) {
+               this(pluginDescriptors, PluginManager.class.getClassLoader());
+       }
+
+       public PluginManager(Collection<PluginDescriptor> pluginDescriptors, 
ClassLoader parentClassLoader) {
+               this.pluginDescriptors = pluginDescriptors;
+               this.parentClassLoader = parentClassLoader;
+       }
+
+       /**
+        * Returns in iterator over all available implementations of the given 
service interface (SPI) in all the plugins
+        * known to this plugin manager instance.
+        *
+        * @param service the service interface (SPI) for which implementations 
are requested.
+        * @param <P> Type of the requested plugin service.
+        * @return Iterator over all implementations of the given service that 
could be loaded from all known plugins.
+        */
+       public <P extends Plugin> Iterator<P> load(Class<P> service) {
+               ArrayList<Iterator<P>> combinedIterators = new 
ArrayList<>(pluginDescriptors.size());
+               for (PluginDescriptor pluginDescriptor : pluginDescriptors) {
+                       PluginLoader pluginLoader = new 
PluginLoader(pluginDescriptor, parentClassLoader);
+                       combinedIterators.add(pluginLoader.load(service));
+               }
+               return Iterators.concat(combinedIterators.iterator());
+       }
+
+       @Override
+       public String toString() {
+               return "PluginManager{" +
+                       "parentClassLoader=" + parentClassLoader +
+                       ", pluginDescriptors=" + pluginDescriptors +
+                       '}';
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java 
b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
new file mode 100644
index 0000000..17e2ae4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * Utility functions for the plugin mechanism.
+ */
+public final class PluginUtils {
+
+       private PluginUtils() {
+               throw new AssertionError("Singleton class.");
+       }
+
+       public static PluginManager 
createPluginManagerFromRootFolder(Optional<Path> pluginsRootPath) {
+               Collection<PluginDescriptor> pluginDescriptorsForDirectory;
+
+               if (pluginsRootPath.isPresent()) {
+                       try {
+                               pluginDescriptorsForDirectory =
+                                       new 
DirectoryBasedPluginFinder(pluginsRootPath.get()).findPlugins();
+                       } catch (IOException e) {
+                               throw new FlinkRuntimeException("Exception when 
trying to initialize plugin system.", e);
+                       }
+               } else {
+                       pluginDescriptorsForDirectory = Collections.emptyList();
+               }
+
+               return new PluginManager(pluginDescriptorsForDirectory);
+       }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/plugin/TemporaryClassLoaderContext.java
similarity index 51%
copy from 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
copy to 
flink-core/src/main/java/org/apache/flink/core/plugin/TemporaryClassLoaderContext.java
index 785391a..eeadae8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/plugin/TemporaryClassLoaderContext.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,33 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.flink.core.fs.local;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.net.URI;
+package org.apache.flink.core.plugin;
 
 /**
- * A factory for the {@link LocalFileSystem}.
+ * Utility class to temporarily change the context classloader. The previous 
context classloader is restored on
+ * {@link #close()}.
  */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
+public final class TemporaryClassLoaderContext implements AutoCloseable {
 
-       @Override
-       public String getScheme() {
-               return LocalFileSystem.getLocalFsURI().getScheme();
-       }
+       /** The previous context class loader to restore on {@link #close()}. */
+       private final ClassLoader toRestore;
 
-       @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
+       public TemporaryClassLoaderContext(ClassLoader 
temporaryContextClassLoader) {
+               this.toRestore = Thread.currentThread().getContextClassLoader();
+               
Thread.currentThread().setContextClassLoader(temporaryContextClassLoader);
        }
 
        @Override
-       public FileSystem create(URI fsUri) {
-               return LocalFileSystem.getSharedInstance();
+       public void close() {
+               Thread.currentThread().setContextClassLoader(toRestore);
        }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinderTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinderTest.java
new file mode 100644
index 0000000..6688e41
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinderTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Test for {@link DirectoryBasedPluginFinder}.
+ */
+public class DirectoryBasedPluginFinderTest {
+
+       @Rule
+       public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Test
+       public void createPluginDescriptorsForDirectory() throws Exception {
+               File rootFolder = temporaryFolder.newFolder();
+               PluginFinder descriptorsFactory =
+                       new DirectoryBasedPluginFinder(rootFolder.toPath());
+               Collection<PluginDescriptor> actual = 
descriptorsFactory.findPlugins();
+
+               Assert.assertTrue("empty root dir -> expected no actual", 
actual.isEmpty());
+
+               List<File> subDirs = Stream.of("A", "B", "C")
+                       .map(s -> new File(rootFolder, s))
+                       .collect(Collectors.toList());
+
+               for (File subDir : subDirs) {
+                       Preconditions.checkState(subDir.mkdirs());
+               }
+
+               try {
+                       descriptorsFactory.findPlugins();
+                       fail("all empty plugin sub-dirs");
+               } catch (RuntimeException expected) {
+                       Assert.assertTrue(expected.getCause() instanceof 
IOException);
+               }
+
+               for (File subDir : subDirs) {
+                       // we create a file and another subfolder to check that 
they are ignored
+                       Preconditions.checkState(new File(subDir, 
"ignore-test.zip").createNewFile());
+                       Preconditions.checkState(new File(subDir, 
"ignore-dir").mkdirs());
+               }
+
+               try {
+                       descriptorsFactory.findPlugins();
+                       fail("still no jars in plugin sub-dirs");
+               } catch (RuntimeException expected) {
+                       Assert.assertTrue(expected.getCause() instanceof 
IOException);
+               }
+
+               List<PluginDescriptor> expected = new ArrayList<>(3);
+
+               for (int i = 0; i < subDirs.size(); ++i) {
+                       File subDir = subDirs.get(i);
+                       URL[] jarURLs = new URL[i + 1];
+
+                       for (int j = 0; j <= i; ++j) {
+                               File file = new File(subDir, "jar-file-" + j + 
".jar");
+                               Preconditions.checkState(file.createNewFile());
+                               jarURLs[j] = file.toURI().toURL();
+                       }
+
+                       Arrays.sort(jarURLs, 
Comparator.comparing(URL::toString));
+                       expected.add(new PluginDescriptor(subDir.getName(), 
jarURLs, new String[0]));
+               }
+
+               actual = descriptorsFactory.findPlugins();
+
+               Assert.assertTrue(equalsIgnoreOrder(expected, new 
ArrayList<>(actual)));
+       }
+
+       private boolean equalsIgnoreOrder(List<PluginDescriptor> a, 
List<PluginDescriptor> b) {
+
+               if (a.size() != b.size()) {
+                       return false;
+               }
+
+               final Comparator<PluginDescriptor> comparator = 
Comparator.comparing(PluginDescriptor::getPluginId);
+
+               a.sort(comparator);
+               b.sort(comparator);
+
+               final Iterator<PluginDescriptor> iterA = a.iterator();
+               final Iterator<PluginDescriptor> iterB = b.iterator();
+
+               while (iterA.hasNext()) {
+                       if (!equals(iterA.next(), iterB.next())) {
+                               return false;
+                       }
+               }
+               return true;
+       }
+
+       private static boolean equals(@Nonnull PluginDescriptor a, @Nonnull 
PluginDescriptor b) {
+               return a.getPluginId().equals(b.getPluginId())
+                       && Arrays.deepEquals(a.getPluginResourceURLs(), 
b.getPluginResourceURLs())
+                       && Arrays.deepEquals(a.getLoaderExcludePatterns(), 
b.getLoaderExcludePatterns());
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/plugin/TemporaryClassLoaderContextTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/plugin/TemporaryClassLoaderContextTest.java
new file mode 100644
index 0000000..1eaf369
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/plugin/TemporaryClassLoaderContextTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.ChildFirstClassLoader;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URL;
+
+/**
+ * Test for {@link TemporaryClassLoaderContext}.
+ */
+public class TemporaryClassLoaderContextTest {
+
+       @Test
+       public void testTemporaryClassLoaderContext() {
+               final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+
+               final ChildFirstClassLoader temporaryClassLoader =
+                       new ChildFirstClassLoader(new URL[0], 
contextClassLoader, new String[0]);
+
+               try (TemporaryClassLoaderContext classLoaderContext = new 
TemporaryClassLoaderContext(temporaryClassLoader)) {
+                       Assert.assertEquals(temporaryClassLoader, 
Thread.currentThread().getContextClassLoader());
+               }
+
+               Assert.assertEquals(contextClassLoader, 
Thread.currentThread().getContextClassLoader());
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
 
b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
index ef1575b..1df47e2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
+++ 
b/flink-core/src/test/java/org/apache/flink/testutils/EntropyInjectingTestFileSystem.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.testutils;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
@@ -54,10 +53,6 @@ public class EntropyInjectingTestFileSystem extends 
LocalFileSystem implements E
                }
 
                @Override
-               public void configure(final Configuration config) {
-               }
-
-               @Override
                public FileSystem create(final URI fsUri) {
                        return new EntropyInjectingTestFileSystem();
                }
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java 
b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
index b799152..6460c46 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileSystem.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.testutils;
 
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -30,6 +26,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileStatus;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 
+import java.io.IOException;
+import java.net.URI;
+
 /**
  * A test file system. This also has a service entry in the test
  * resources, to be loaded during tests.
@@ -91,9 +90,6 @@ public class TestFileSystem extends LocalFileSystem {
                }
 
                @Override
-               public void configure(Configuration config) {}
-
-               @Override
                public FileSystem create(URI fsUri) throws IOException {
                        return new TestFileSystem();
                }
diff --git 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
index e163f63..d738198 100644
--- 
a/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
+++ 
b/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFsFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.fs.maprfs;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 
@@ -48,11 +47,6 @@ public class MapRFsFactory implements FileSystemFactory {
        }
 
        @Override
-       public void configure(Configuration config) {
-               // nothing to configure based on the configuration here
-       }
-
-       @Override
        public FileSystem create(URI fsUri) throws IOException {
                checkNotNull(fsUri, "fsUri");
 
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 9fa652d..de1c205 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -412,6 +412,37 @@ under the License.
                                <artifactId>maven-assembly-plugin</artifactId>
                                <version>2.4</version><!--$NO-MVN-MAN-VER$-->
                                <executions>
+
+                                       <execution>
+                                               <id>create-plugin-a-jar</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>plugin-a</finalName>
+                                                       <attach>false</attach>
+                                                       <descriptors>
+                                                               
<descriptor>src/test/assembly/test-plugin-a-assembly.xml</descriptor>
+                                                       </descriptors>
+                                               </configuration>
+                                       </execution>
+
+                                       <execution>
+                                               <id>create-plugin-b-jar</id>
+                                               
<phase>process-test-classes</phase>
+                                               <goals>
+                                                       <goal>single</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>plugin-b</finalName>
+                                                       <attach>false</attach>
+                                                       <descriptors>
+                                                               
<descriptor>src/test/assembly/test-plugin-b-assembly.xml</descriptor>
+                                                       </descriptors>
+                                               </configuration>
+                                       </execution>
+
                                        <execution>
                                                <id>create-kmeans-jar</id>
                                                
<phase>process-test-classes</phase>
diff --git a/flink-tests/src/test/assembly/test-plugin-a-assembly.xml 
b/flink-tests/src/test/assembly/test-plugin-a-assembly.xml
new file mode 100644
index 0000000..7b9c175
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-plugin-a-assembly.xml
@@ -0,0 +1,43 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+       <id>test-jar</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <fileSets>
+               <fileSet>
+                       
<directory>${project.build.testOutputDirectory}</directory>
+                       <outputDirectory>/</outputDirectory>
+                       <!-- the service impl -->
+                       <includes>
+                               
<include>org/apache/flink/test/plugin/jar/plugina/TestServiceA.class</include>
+                               
<include>org/apache/flink/test/plugin/jar/plugina/DynamicClassA.class</include>
+                       </includes>
+               </fileSet>
+               <fileSet>
+                       <!-- declaring the service impl -->
+                       
<directory>${project.basedir}/src/test/resources/plugin-test/plugin-a</directory>
+                       <outputDirectory>/META-INF/services</outputDirectory>
+               </fileSet>
+       </fileSets>
+</assembly>
diff --git a/flink-tests/src/test/assembly/test-plugin-b-assembly.xml 
b/flink-tests/src/test/assembly/test-plugin-b-assembly.xml
new file mode 100644
index 0000000..c15a1e4
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-plugin-b-assembly.xml
@@ -0,0 +1,43 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+       <id>test-jar</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <fileSets>
+               <fileSet>
+                       
<directory>${project.build.testOutputDirectory}</directory>
+                       <outputDirectory>/</outputDirectory>
+                       <!-- the service impl -->
+                       <includes>
+                               
<include>org/apache/flink/test/plugin/jar/pluginb/TestServiceB.class</include>
+                               
<include>org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.class</include>
+                       </includes>
+               </fileSet>
+               <fileSet>
+                       <!-- declaring the service impl -->
+                       
<directory>${project.basedir}/src/test/resources/plugin-test/plugin-b</directory>
+                       <outputDirectory>/META-INF/services</outputDirectory>
+               </fileSet>
+       </fileSets>
+</assembly>
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/OtherTestSpi.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/OtherTestSpi.java
new file mode 100644
index 0000000..b65859d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/OtherTestSpi.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin;
+
+import org.apache.flink.core.plugin.Plugin;
+
+/**
+ * Another service interface for tests of plugin mechanism.
+ */
+public interface OtherTestSpi extends Plugin {
+       String otherTestMethod();
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java
new file mode 100644
index 0000000..2460ad9
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginLoaderTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin;
+
+import org.apache.flink.core.plugin.PluginDescriptor;
+import org.apache.flink.core.plugin.PluginLoader;
+import org.apache.flink.test.plugin.jar.plugina.TestServiceA;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Iterator;
+
+/**
+ * Test for {@link PluginLoader}.
+ */
+public class PluginLoaderTest extends PluginTestBase{
+
+       @Test
+       public void testPluginLoading() throws Exception {
+
+               final URL classpathA = createPluginJarURLFromString(PLUGIN_A);
+
+               PluginDescriptor pluginDescriptorA = new PluginDescriptor("A", 
new URL[]{classpathA}, new String[0]);
+               final PluginLoader pluginLoaderA = new 
PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER);
+
+               Iterator<TestSpi> testSpiIteratorA = 
pluginLoaderA.load(TestSpi.class);
+
+               Assert.assertTrue(testSpiIteratorA.hasNext());
+
+               TestSpi testSpiA = testSpiIteratorA.next();
+
+               Assert.assertFalse(testSpiIteratorA.hasNext());
+
+               Assert.assertNotNull(testSpiA.testMethod());
+
+               Assert.assertEquals(TestServiceA.class.getCanonicalName(), 
testSpiA.getClass().getCanonicalName());
+               Assert.assertNotEquals(PARENT_CLASS_LOADER, 
testSpiA.getClass().getClassLoader());
+
+               // Looks strange, but we want to ensure that those classes are 
not instance of each other because they were
+               // loaded by different classloader instances because the plugin 
loader uses child-before-parent order.
+               Assert.assertFalse(testSpiA instanceof TestServiceA);
+
+               // In the following we check for isolation of classes between 
different plugin loaders.
+               final PluginLoader secondPluginLoaderA = new 
PluginLoader(pluginDescriptorA, PARENT_CLASS_LOADER);
+
+               TestSpi secondTestSpiA = 
secondPluginLoaderA.load(TestSpi.class).next();
+               Assert.assertNotNull(secondTestSpiA.testMethod());
+
+               // Again, this looks strange, but we expect classes with the 
same name, that are not equal.
+               Assert.assertEquals(testSpiA.getClass().getCanonicalName(), 
secondTestSpiA.getClass().getCanonicalName());
+               Assert.assertNotEquals(testSpiA.getClass(), 
secondTestSpiA.getClass());
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java
new file mode 100644
index 0000000..af5667c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginManagerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin;
+
+import org.apache.flink.core.plugin.DirectoryBasedPluginFinder;
+import org.apache.flink.core.plugin.PluginDescriptor;
+import org.apache.flink.core.plugin.PluginFinder;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test for {@link PluginManager}.
+ */
+public class PluginManagerTest extends PluginTestBase {
+
+       @Rule
+       public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private Collection<PluginDescriptor> descriptors;
+
+       @Before
+       public void setup() throws Exception {
+               /*
+                * We setup a plugin directory hierarchy and utilize 
DirectoryBasedPluginFinder to create the
+                * descriptors:
+                *
+                * <pre>
+                * tmp/plugins-root/
+                *          |-------------A/
+                *          |             |-plugin-a.jar
+                *          |
+                *          |-------------B/
+                *                        |-plugin-b.jar
+                * </pre>
+                */
+               final File pluginRootFolder = temporaryFolder.newFolder();
+               final Path pluginRootFolderPath = pluginRootFolder.toPath();
+               final File pluginAFolder = new File(pluginRootFolder, "A");
+               final File pluginBFolder = new File(pluginRootFolder, "B");
+               Preconditions.checkState(pluginAFolder.mkdirs());
+               Preconditions.checkState(pluginBFolder.mkdirs());
+               Files.copy(locateJarFile(PLUGIN_A).toPath(), 
Paths.get(pluginAFolder.toString(), PLUGIN_A));
+               Files.copy(locateJarFile(PLUGIN_B).toPath(), 
Paths.get(pluginBFolder.toString(), PLUGIN_B));
+               final PluginFinder descriptorsFactory = new 
DirectoryBasedPluginFinder(pluginRootFolderPath);
+               descriptors = descriptorsFactory.findPlugins();
+               Preconditions.checkState(descriptors.size() == 2);
+       }
+
+       @Test
+       public void testPluginLoading() {
+
+               final PluginManager pluginManager = new 
PluginManager(descriptors, PARENT_CLASS_LOADER);
+               final List<TestSpi> serviceImplList = 
Lists.newArrayList(pluginManager.load(TestSpi.class));
+               Assert.assertEquals(2, serviceImplList.size());
+
+               // check that all impl have unique classloader
+               final Set<ClassLoader> classLoaders = 
Collections.newSetFromMap(new IdentityHashMap<>(3));
+               classLoaders.add(PARENT_CLASS_LOADER);
+               for (TestSpi testSpi : serviceImplList) {
+                       Assert.assertNotNull(testSpi.testMethod());
+                       
Assert.assertTrue(classLoaders.add(testSpi.getClass().getClassLoader()));
+               }
+
+               final List<OtherTestSpi> otherServiceImplList = 
Lists.newArrayList(pluginManager.load(OtherTestSpi.class));
+               Assert.assertEquals(1, otherServiceImplList.size());
+               for (OtherTestSpi otherTestSpi : otherServiceImplList) {
+                       Assert.assertNotNull(otherTestSpi.otherTestMethod());
+                       
Assert.assertTrue(classLoaders.add(otherTestSpi.getClass().getClassLoader()));
+               }
+       }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginTestBase.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginTestBase.java
new file mode 100644
index 0000000..7f1afe6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/PluginTestBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * Base class for some tests related to the plugin mechanism. Provides access 
to some common test resources.
+ */
+abstract class PluginTestBase extends TestLogger {
+
+       /** Optional prefix to the jar folder if run from an IDE. */
+       private static final String OPT_PREFIX = "target/";
+
+       static final String PLUGIN_A = "plugin-a-test-jar.jar";
+       static final String PLUGIN_B = "plugin-b-test-jar.jar";
+       static final ClassLoader PARENT_CLASS_LOADER = 
PluginTestBase.class.getClassLoader();
+
+       URL createPluginJarURLFromString(String fileString) throws 
MalformedURLException {
+               File file = locateJarFile(fileString);
+               return file.toURI().toURL();
+       }
+
+       static File locateJarFile(String fileString) {
+               File file = new File(fileString);
+               if (!file.exists()) {
+                       file = new File(OPT_PREFIX + fileString);
+               }
+               Preconditions.checkState(file.exists(), "Unable to locate jar 
file for test: " + fileString);
+               return file;
+       }
+
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/plugin/TestSpi.java 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/TestSpi.java
new file mode 100644
index 0000000..550108f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/plugin/TestSpi.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.plugin;
+
+import org.apache.flink.core.plugin.Plugin;
+
+/**
+ * Service interface for tests of plugin mechanism.
+ */
+public interface TestSpi extends Plugin {
+       String testMethod();
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/DynamicClassA.java
similarity index 50%
copy from 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
copy to 
flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/DynamicClassA.java
index 785391a..d4b5716 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/DynamicClassA.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,33 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.core.fs.local;
+package org.apache.flink.test.plugin.jar.plugina;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.net.URI;
+import org.apache.flink.test.plugin.TestSpi;
 
 /**
- * A factory for the {@link LocalFileSystem}.
+ * This class exists in the test to validate that dynamic classloading ({@link 
Class#forName(String)} works inside of
+ * plugin code.
  */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
-
-       @Override
-       public String getScheme() {
-               return LocalFileSystem.getLocalFsURI().getScheme();
-       }
-
-       @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
-       }
+public class DynamicClassA implements TestSpi {
 
        @Override
-       public FileSystem create(URI fsUri) {
-               return LocalFileSystem.getSharedInstance();
+       public String testMethod() {
+               return getClass().getName();
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/TestServiceA.java
similarity index 51%
copy from 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
copy to 
flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/TestServiceA.java
index 785391a..0026773 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/plugina/TestServiceA.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,33 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.flink.core.fs.local;
+package org.apache.flink.test.plugin.jar.plugina;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.net.URI;
+import org.apache.flink.test.plugin.TestSpi;
 
 /**
- * A factory for the {@link LocalFileSystem}.
+ * First implementation of {@link TestSpi}.
  */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
+public class TestServiceA implements TestSpi {
 
-       @Override
-       public String getScheme() {
-               return LocalFileSystem.getLocalFsURI().getScheme();
-       }
+       private final TestSpi dynamicDelegate;
 
-       @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
+       public TestServiceA() {
+               try {
+                       dynamicDelegate = (TestSpi) 
Class.forName(DynamicClassA.class.getName()).newInstance();
+               } catch (Exception e) {
+                       throw new IllegalStateException("Unable to load dynamic 
class.");
+               }
        }
 
        @Override
-       public FileSystem create(URI fsUri) {
-               return LocalFileSystem.getSharedInstance();
+       public String testMethod() {
+               return getClass().getName() + "(" + 
dynamicDelegate.testMethod() + ")";
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.java
similarity index 50%
copy from 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
copy to 
flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.java
index 785391a..e69af1c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/OtherTestServiceB.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,33 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.core.fs.local;
+package org.apache.flink.test.plugin.jar.pluginb;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.net.URI;
+import org.apache.flink.test.plugin.OtherTestSpi;
 
 /**
- * A factory for the {@link LocalFileSystem}.
+ * Implementation of {@link OtherTestSpi}.
  */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
-
-       @Override
-       public String getScheme() {
-               return LocalFileSystem.getLocalFsURI().getScheme();
-       }
-
-       @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
-       }
+public class OtherTestServiceB implements OtherTestSpi {
 
        @Override
-       public FileSystem create(URI fsUri) {
-               return LocalFileSystem.getSharedInstance();
+       public String otherTestMethod() {
+               return getClass().getName();
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/TestServiceB.java
similarity index 50%
copy from 
flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
copy to 
flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/TestServiceB.java
index 785391a..efd9194 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystemFactory.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/plugin/jar/pluginb/TestServiceB.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,33 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.core.fs.local;
+package org.apache.flink.test.plugin.jar.pluginb;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.FileSystemFactory;
-
-import java.net.URI;
+import org.apache.flink.test.plugin.TestSpi;
 
 /**
- * A factory for the {@link LocalFileSystem}.
+ * Second implementation of {@link TestSpi}.
  */
-@PublicEvolving
-public class LocalFileSystemFactory implements FileSystemFactory {
-
-       @Override
-       public String getScheme() {
-               return LocalFileSystem.getLocalFsURI().getScheme();
-       }
-
-       @Override
-       public void configure(Configuration config) {
-               // the local file system takes no configuration, so nothing to 
do here
-       }
+public class TestServiceB implements TestSpi {
 
        @Override
-       public FileSystem create(URI fsUri) {
-               return LocalFileSystem.getSharedInstance();
+       public String testMethod() {
+               return getClass().getName();
        }
 }
diff --git 
a/flink-tests/src/test/resources/plugin-test/plugin-a/org.apache.flink.test.plugin.TestSpi
 
b/flink-tests/src/test/resources/plugin-test/plugin-a/org.apache.flink.test.plugin.TestSpi
new file mode 100644
index 0000000..6d6b1de
--- /dev/null
+++ 
b/flink-tests/src/test/resources/plugin-test/plugin-a/org.apache.flink.test.plugin.TestSpi
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.test.plugin.jar.plugina.TestServiceA
diff --git 
a/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.OtherTestSpi
 
b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.OtherTestSpi
new file mode 100644
index 0000000..341c830
--- /dev/null
+++ 
b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.OtherTestSpi
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.test.plugin.jar.pluginb.OtherTestServiceB
diff --git 
a/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.TestSpi
 
b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.TestSpi
new file mode 100644
index 0000000..159a3c6
--- /dev/null
+++ 
b/flink-tests/src/test/resources/plugin-test/plugin-b/org.apache.flink.test.plugin.TestSpi
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.test.plugin.jar.pluginb.TestServiceB

Reply via email to