This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 77cf7e2 DRILL-7317: Close ClassLoaders used for udf jars uploading when closing FunctionImplementationRegistry 77cf7e2 is described below commit 77cf7e2ee61fb40e7efd85148ac76947d13dda38 Author: Volodymyr Vysotskyi <vvo...@gmail.com> AuthorDate: Fri Jul 5 16:13:54 2019 +0300 DRILL-7317: Close ClassLoaders used for udf jars uploading when closing FunctionImplementationRegistry - Fix issue with caching DrillMergeProjectRule and FunctionImplementationRegistry when different drillbits are started within the same JVM --- .../expr/fn/FunctionImplementationRegistry.java | 3 +- .../expr/fn/registry/FunctionRegistryHolder.java | 75 ++++++++++++++++++---- .../expr/fn/registry/LocalFunctionRegistry.java | 7 +- .../planner/logical/DrillMergeProjectRule.java | 6 +- 4 files changed, 70 insertions(+), 21 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index 30c194a..4210067 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -455,7 +455,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au return RunTimeScan.dynamicPackageScan(drillConfig, Sets.newHashSet(urls)); } finally { if (markerFileConnection instanceof JarURLConnection) { - ((JarURLConnection) markerFile.openConnection()).getJarFile().close(); + ((JarURLConnection) markerFileConnection).getJarFile().close(); } } } @@ -592,6 +592,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au */ @Override public void close() { + localFunctionRegistry.close(); if (deleteTmpDir) { FileUtils.deleteQuietly(tmpDir); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java index d0383e9..f378d45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java @@ -29,6 +29,8 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -83,7 +85,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * <li><b>function holder for upper(VARCHAR-REQUIRED)</b> is {@link DrillFuncHolder} initiated for each function.</li> * */ -public class FunctionRegistryHolder { +public class FunctionRegistryHolder implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FunctionRegistryHolder.class); @@ -376,23 +378,13 @@ public class FunctionRegistryHolder { return; } + boolean isClosed = false; for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) { final String function = functionEntry.getKey(); Map<String, DrillFuncHolder> functionHolders = functions.get(function); Queue<String> functionSignatures = functionEntry.getValue(); - for (Map.Entry<String, DrillFuncHolder> entry : functionHolders.entrySet()) { - if (functionSignatures.contains(entry.getKey())) { - ClassLoader classLoader = entry.getValue().getClassLoader(); - if (classLoader instanceof AutoCloseable) { - try { - ((AutoCloseable) classLoader).close(); - } catch (Exception e) { - logger.warn("Problem during closing class loader", e); - } - } - break; - } - } + // closes class loader only one time + isClosed = !isClosed && closeClassLoader(function, functionSignatures); functionHolders.keySet().removeAll(functionSignatures); if (functionHolders.isEmpty()) { @@ -400,4 +392,59 @@ public class FunctionRegistryHolder { } } } + + @Override + public void close() { + try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { + jars.forEach((jarName, jar) -> { + if (!LocalFunctionRegistry.BUILT_IN.equals(jarName)) { + for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) { + if (closeClassLoader(functionEntry.getKey(), functionEntry.getValue())) { + // class loader is closed, iterates to another jar + break; + } + } + } + }); + } + } + + /** + * Produces search of {@link DrillFuncHolder} which corresponds to specified {@code String functionName} + * with signature from {@code Queue<String> functionSignatures}, + * closes its class loader if {@link DrillFuncHolder} is found and returns true. Otherwise false is returned. + * + * @param functionName name of the function + * @param functionSignatures function signatures + * @return {@code true} if {@link DrillFuncHolder} was found and attempted to close class loader disregarding the result + */ + private boolean closeClassLoader(String functionName, Queue<String> functionSignatures) { + return getDrillFuncHolder(functionName, functionSignatures) + .map(drillFuncHolder -> { + ClassLoader classLoader = drillFuncHolder.getClassLoader(); + try { + ((AutoCloseable) classLoader).close(); + } catch (Exception e) { + logger.warn("Problem during closing class loader", e); + } + return true; + }) + .orElse(false); + } + + /** + * Produces search of {@link DrillFuncHolder} which corresponds to specified {@code String functionName} + * with signature from {@code Queue<String> functionSignatures} and returns first found instance. + * + * @param functionName name of the function + * @param functionSignatures function signatures + * @return {@link Optional} with first found {@link DrillFuncHolder} instance + */ + private Optional<DrillFuncHolder> getDrillFuncHolder(String functionName, Queue<String> functionSignatures) { + Map<String, DrillFuncHolder> functionHolders = functions.get(functionName); + return functionSignatures.stream() + .map(functionHolders::get) + .filter(Objects::nonNull) + .findAny(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java index f96b1fb..cba900f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java @@ -53,7 +53,7 @@ import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference; /** * Registry of Drill functions. */ -public class LocalFunctionRegistry { +public class LocalFunctionRegistry implements AutoCloseable { public static final String BUILT_IN = "built-in"; @@ -356,4 +356,9 @@ public class LocalFunctionRegistry { } } } + + @Override + public void close() { + registryHolder.close(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java index f63759a..9801011 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillMergeProjectRule.java @@ -47,15 +47,11 @@ import java.util.List; public class DrillMergeProjectRule extends RelOptRule { private FunctionImplementationRegistry functionRegistry; - private static DrillMergeProjectRule INSTANCE = null; private final boolean force; public static DrillMergeProjectRule getInstance(boolean force, ProjectFactory pFactory, FunctionImplementationRegistry functionRegistry) { - if (INSTANCE == null) { - INSTANCE = new DrillMergeProjectRule(force, pFactory, functionRegistry); - } - return INSTANCE; + return new DrillMergeProjectRule(force, pFactory, functionRegistry); } private DrillMergeProjectRule(boolean force, ProjectFactory pFactory,