This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e01ccc2 Initial code dump for dynamically loading pinot plugins
(#4899)
e01ccc2 is described below
commit e01ccc2b98a342dce7f3bd0d547cf7c3c66035e7
Author: Kishore Gopalakrishna <[email protected]>
AuthorDate: Sat Dec 7 23:05:01 2019 -0800
Initial code dump for dynamically loading pinot plugins (#4899)
* Initial code dump for dynamically loading pinot plugins
* Adding license header
* Adding more methods to PluginManager
* Addressing comments, added a test case
---
pinot-record-readers/pom.xml | 1 +
.../java/org/apache/pinot/spi/plugin/Plugin.java | 48 ++++++
.../apache/pinot/spi/plugin/PluginClassLoader.java | 149 ++++++++++++++++++
.../org/apache/pinot/spi/plugin/PluginManager.java | 175 +++++++++++++++++++++
.../apache/pinot/spi/plugin/PluginManagerTest.java | 93 +++++++++++
pinot-spi/src/test/resources/TestRecordReader.java | 61 +++++++
6 files changed, 527 insertions(+)
diff --git a/pinot-record-readers/pom.xml b/pinot-record-readers/pom.xml
index 1dc63dc..96c9578 100644
--- a/pinot-record-readers/pom.xml
+++ b/pinot-record-readers/pom.xml
@@ -50,6 +50,7 @@
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-spi</artifactId>
+ <scope>provided</scope>
</dependency>
<!-- test -->
<dependency>
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/Plugin.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/Plugin.java
new file mode 100644
index 0000000..312e25e
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/Plugin.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.util.Objects;
+
+
+public class Plugin {
+
+ String _name;
+
+ public Plugin(String name) {
+ _name = name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Plugin plugin = (Plugin) o;
+ return Objects.equals(_name, plugin._name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(_name);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginClassLoader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginClassLoader.java
new file mode 100644
index 0000000..b58b29f
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginClassLoader.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
+
+public class PluginClassLoader extends URLClassLoader {
+
+ private final ClassLoader sysClzLoader;
+
+ public PluginClassLoader(URL[] urls, ClassLoader parent) {
+ super(urls, parent);
+ sysClzLoader = getSystemClassLoader();
+ URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
+ Method method = null;
+ try {
+ method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
+
+ } catch (NoSuchMethodException e) {
+ //this should never happen
+ ExceptionUtils.rethrow(e);
+ }
+ method.setAccessible(true);
+ for (URL url : urls) {
+ try {
+ method.invoke(classLoader, url);
+ } catch (Exception e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve)
+ throws ClassNotFoundException {
+ // has the class loaded already?
+ Class<?> loadedClass = findLoadedClass(name);
+ if (loadedClass == null) {
+ try {
+ if (sysClzLoader != null) {
+ loadedClass = sysClzLoader.loadClass(name);
+ }
+ } catch (ClassNotFoundException ex) {
+ // class not found in system class loader... silently skipping
+ }
+
+ try {
+ // find the class from given jar urls as in first constructor
parameter.
+ if (loadedClass == null) {
+ loadedClass = findClass(name);
+ }
+ } catch (ClassNotFoundException e) {
+ // class is not found in the given urls.
+ // Let's try it in parent classloader.
+ // If class is still not found, then this method will throw class not
found ex.
+ loadedClass = super.loadClass(name, resolve);
+ }
+ }
+
+ if (resolve) { // marked to resolve
+ resolveClass(loadedClass);
+ }
+ return loadedClass;
+ }
+
+ @Override
+ public Enumeration<URL> getResources(String name)
+ throws IOException {
+ List<URL> allRes = new LinkedList<>();
+
+ // load resources from sys class loader
+ Enumeration<URL> sysResources = sysClzLoader.getResources(name);
+ if (sysResources != null) {
+ while (sysResources.hasMoreElements()) {
+ allRes.add(sysResources.nextElement());
+ }
+ }
+
+ // load resource from this classloader
+ Enumeration<URL> thisRes = findResources(name);
+ if (thisRes != null) {
+ while (thisRes.hasMoreElements()) {
+ allRes.add(thisRes.nextElement());
+ }
+ }
+
+ // then try finding resources from parent classloaders
+ Enumeration<URL> parentRes = super.findResources(name);
+ if (parentRes != null) {
+ while (parentRes.hasMoreElements()) {
+ allRes.add(parentRes.nextElement());
+ }
+ }
+
+ return new Enumeration<URL>() {
+ Iterator<URL> it = allRes.iterator();
+
+ @Override
+ public boolean hasMoreElements() {
+ return it.hasNext();
+ }
+
+ @Override
+ public URL nextElement() {
+ return it.next();
+ }
+ };
+ }
+
+ @Override
+ public URL getResource(String name) {
+ URL res = null;
+ if (sysClzLoader != null) {
+ res = sysClzLoader.getResource(name);
+ }
+ if (res == null) {
+ res = findResource(name);
+ }
+ if (res == null) {
+ res = super.getResource(name);
+ }
+ return res;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
new file mode 100644
index 0000000..d6b5458
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+
+
+public class PluginManager {
+
+ public static final String DEFAULT_PLUGIN_NAME = "DEFAULT";
+ static PluginManager PLUGIN_MANAGER = new PluginManager();
+
+ Map<Plugin, PluginClassLoader> _registry = new HashMap<>();
+
+ private PluginManager() {
+ _registry.put(new Plugin(DEFAULT_PLUGIN_NAME),
createClassLoader(Collections.emptyList()));
+ }
+
+ /**
+ * Loads jars recursively
+ * @param pluginName
+ * @param directory
+ */
+ public void load(String pluginName, File directory) {
+ Collection<File> jarFiles = FileUtils.listFiles(directory, new
String[]{"jar"}, true);
+ Collection<URL> urlList = new ArrayList<>();
+ for (File jarFile : jarFiles) {
+ try {
+ urlList.add(jarFile.toURI().toURL());
+ } catch (MalformedURLException e) {
+ //ignore
+ }
+ }
+ PluginClassLoader classLoader = createClassLoader(urlList);
+ _registry.put(new Plugin(pluginName), classLoader);
+ }
+
+ private PluginClassLoader createClassLoader(Collection<URL> urlList) {
+ URL[] urls = new URL[urlList.size()];
+ urlList.toArray(urls);
+ //always sort to make the behavior predictable
+ Arrays.sort(urls);
+ return new PluginClassLoader(urls, this.getClass().getClassLoader());
+ }
+
+ /**
+ * Loads a class. The class name can be in any of the following formats
+ * <li>com.x.y.foo</li> loads the class in the default class path
+ * <li>pluginName:com.x.y.foo</li> loads the class in plugin specific
classloader
+ * @param className
+ * @return
+ * @throws ClassNotFoundException
+ */
+ public Class<?> loadClass(String className)
+ throws ClassNotFoundException {
+ String pluginName = DEFAULT_PLUGIN_NAME;
+ String realClassName = DEFAULT_PLUGIN_NAME;
+ if (className.indexOf(":") > -1) {
+ String[] split = className.split("\\:");
+ pluginName = split[0];
+ realClassName = split[1];
+ }
+ return loadClass(pluginName, realClassName);
+ }
+
+ /**
+ * Loads a class using the plugin specific class loader
+ * @param pluginName
+ * @param className
+ * @return
+ * @throws ClassNotFoundException
+ */
+ public Class<?> loadClass(String pluginName, String className)
+ throws ClassNotFoundException {
+ return _registry.get(new Plugin(pluginName)).loadClass(className, true);
+ }
+
+ /**
+ * Create an instance of the className. The className can be in any of the
following formats
+ * <li>com.x.y.foo</li> loads the class in the default class path
+ * <li>pluginName:com.x.y.foo</li> loads the class in plugin specific
classloader
+ * @param className
+ * @return
+ * @throws ClassNotFoundException
+ */
+ public <T> T createInstance(String className)
+ throws Exception {
+ return createInstance(className, new Class[]{}, new Object[]{});
+ }
+
+ /**
+ * Create an instance of the className. The className can be in any of the
following formats
+ * <li>com.x.y.foo</li> loads the class in the default class path
+ * <li>pluginName:com.x.y.foo</li> loads the class in plugin specific
classloader
+ * @param className
+ * @return
+ * @throws ClassNotFoundException
+ */
+ public <T> T createInstance(String className, Class[] argTypes, Object[]
argValues)
+ throws Exception {
+ String pluginName = DEFAULT_PLUGIN_NAME;
+ String realClassName = DEFAULT_PLUGIN_NAME;
+ if (className.indexOf(":") > -1) {
+ String[] split = className.split("\\:");
+ pluginName = split[0];
+ realClassName = split[1];
+ }
+ return createInstance(pluginName, realClassName, argTypes, argValues);
+ }
+
+ /**
+ * Creates an instance of className using classloader specific to the plugin
+ * @param pluginName
+ * @param className
+ * @param <T>
+ * @return
+ * @throws Exception
+ */
+ public <T> T createInstance(String pluginName, String className)
+ throws Exception {
+ return createInstance(pluginName, className, new Class[]{}, new
Object[]{});
+ }
+
+ /**
+ *
+ * @param pluginName
+ * @param className
+ * @param argTypes
+ * @param argValues
+ * @param <T>
+ * @return
+ */
+ public <T> T createInstance(String pluginName, String className, Class[]
argTypes, Object[] argValues)
+ throws Exception {
+ PluginClassLoader pluginClassLoader = PLUGIN_MANAGER._registry.get(new
Plugin(pluginName));
+ Class<T> loadedClass = (Class<T>) pluginClassLoader.loadClass(className,
true);
+ Constructor<?> constructor = loadedClass.getConstructor(argTypes);
+ if (constructor != null) {
+ Object instance = constructor.newInstance(argValues);
+ return (T) instance;
+ }
+ return null;
+ }
+
+ public static PluginManager get() {
+ return PLUGIN_MANAGER;
+ }
+}
+
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
new file mode 100644
index 0000000..ac5a305
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.plugin;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.URL;
+import java.util.jar.JarEntry;
+import java.util.jar.JarOutputStream;
+import javax.tools.JavaCompiler;
+import javax.tools.ToolProvider;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PluginManagerTest {
+
+ private File tempDir;
+ private String jarFile;
+ private File jarDirFile;
+
+ @BeforeClass
+ public void setup() {
+
+ tempDir = new File(System.getProperty("java.io.tmpdir"),
"pinot-plugin-test");
+ tempDir.delete();
+ tempDir.mkdirs();
+
+ String jarDir = tempDir + "/" + "test-record-reader";
+ jarFile = jarDir + "/" + "test-record-reader.jar";
+ jarDirFile = new File(jarDir);
+ jarDirFile.mkdirs();
+ }
+
+ @Test
+ public void testSimple()
+ throws Exception {
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ URL javaFile =
Thread.currentThread().getContextClassLoader().getResource("TestRecordReader.java");
+ if (javaFile != null) {
+ compiler.run(null, null, null, javaFile.getFile(), "-d",
tempDir.getAbsolutePath());
+
+ URL classFile =
Thread.currentThread().getContextClassLoader().getResource("TestRecordReader.class");
+
+ if (classFile != null) {
+ JarOutputStream jos = new JarOutputStream(new
FileOutputStream(jarFile));
+ jos.putNextEntry(new JarEntry(new
File(classFile.getFile()).getName()));
+ jos.write(FileUtils.readFileToByteArray(new
File(classFile.getFile())));
+ jos.closeEntry();
+ jos.close();
+
+ PluginManager.get().load("test-record-reader", jarDirFile);
+
+ RecordReader testRecordReader =
PluginManager.get().createInstance("test-record-reader", "TestRecordReader");
+ testRecordReader.init(null, null, null);
+ int count = 0;
+ while (testRecordReader.hasNext()) {
+ GenericRow row = testRecordReader.next();
+ count++;
+ }
+
+ Assert.assertEquals(count, 10);
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ tempDir.delete();
+ FileUtils.deleteQuietly(jarDirFile);
+ }
+}
diff --git a/pinot-spi/src/test/resources/TestRecordReader.java
b/pinot-spi/src/test/resources/TestRecordReader.java
new file mode 100644
index 0000000..8f212a1
--- /dev/null
+++ b/pinot-spi/src/test/resources/TestRecordReader.java
@@ -0,0 +1,61 @@
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+import org.apache.pinot.spi.data.readers.RecordReaderUtils;
+
+
+/**
+ * Record reader for AVRO file.
+ */
+public class TestRecordReader implements RecordReader {
+
+ List<GenericRow> _rows = new ArrayList<>();
+ Iterator<GenericRow> _iterator;
+ Schema _schema;
+
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ throws IOException {
+ _schema = schema;
+ int numRows = 10;
+ for (int i = 0; i < numRows; i++) {
+ GenericRow row = new GenericRow();
+ row.putValue("key", "value-" + i);
+ _rows.add(row);
+ }
+ _iterator = _rows.iterator();
+ }
+
+ public boolean hasNext() {
+ return _iterator.hasNext();
+ }
+
+ public GenericRow next()
+ throws IOException {
+ return _iterator.next();
+ }
+
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ return _iterator.next();
+ }
+
+ public void rewind()
+ throws IOException {
+ _iterator = _rows.iterator();
+ }
+
+ public Schema getSchema() {
+ return _schema;
+ }
+ public void close () {
+
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]