[ https://issues.apache.org/jira/browse/DRILL-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641787#comment-16641787 ]
ASF GitHub Bot commented on DRILL-6762: --------------------------------------- asfgit closed pull request #1484: DRILL-6762: Fix dynamic UDFs versioning issue URL: https://github.com/apache/drill/pull/1484 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java index 2a331bb9107..d2dc0f8a750 100644 --- a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java +++ b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java @@ -181,4 +181,17 @@ static UserException findWrappedUserException(Throwable ex) { return (UserException) cause; } + /** + * Helps to hide checked exception from the compiler but then actually throw it. + * Is useful when implementing functional interfaces that allow checked exceptions. + * + * @param e original exception instance + * @param <E> exception type + * @throws E exception instance + */ + @SuppressWarnings("unchecked") + public static <E extends Throwable> void sneakyThrow(Throwable e) throws E { + throw (E) e; + } + } diff --git a/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java b/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java new file mode 100644 index 00000000000..050565e7a59 --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.util.function; + +import java.util.function.Function; + +import static org.apache.drill.common.exceptions.ErrorHelper.sneakyThrow; + +/** + * Extension of {@link Function} that allows to throw checked exception. + * + * @param <T> function argument type + * @param <R> function result type + * @param <E> exception type + */ +@FunctionalInterface +public interface CheckedFunction<T, R, E extends Throwable> extends Function<T, R> { + + /** + * Overrides {@link Function#apply(Object)} method to allow calling functions that throw checked exceptions. + * Is useful when used in methods that accept {@link Function}. + * For example: {@link java.util.Map#computeIfAbsent(Object, Function)}. + * + * @param t the function argument + * @return the function result + */ + @Override + default R apply(T t) { + try { + return applyAndThrow(t); + } catch (Throwable e) { + sneakyThrow(e); + } + // should never happen + throw new RuntimeException(); + } + + /** + * Applies function to the given argument. + * + * @param t the function argument + * @return the function result + * @throws E exception in case of errors + */ + R applyAndThrow(T t) throws E; + +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java b/common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java similarity index 92% rename from exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java rename to common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java index b744ac8c033..60633839945 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java +++ b/common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java @@ -15,10 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.drill.exec.util; +package org.apache.drill.common.util.function; /** - * The java standard library does not provide a lambda function interface for funtions that take no arguments, + * The java standard library does not provide a lambda function interface for functions that take no arguments, * but that throw an exception. So, we have to define our own here. * @param <T> The return type of the lambda function. * @param <E> The type of exception thrown by the lambda function. diff --git a/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java b/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java new file mode 100644 index 00000000000..a1ab389836e --- /dev/null +++ b/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.util.function; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.Map; + +public class TestCheckedFunction { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testComputeIfAbsentWithCheckedFunction() { + ExceptionProducer producer = new ExceptionProducer(); + Map<String, String> map = new HashMap<>(); + String message = "Exception message"; + CheckedFunction<String, String, Exception> function = producer::failWithMessage; + + thrown.expect(Exception.class); + thrown.expectMessage(message); + + map.computeIfAbsent(message, function); + } + + private class ExceptionProducer { + + String failWithMessage(String message) throws Exception { + throw new Exception(message); + } + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java index 135ccd4e1bb..35feaa9f282 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java @@ -25,7 +25,7 @@ /** * Returns a {@link TransientStore transient store} instance for the given configuration. * - * Note that implementors have liberty to cache previous {@link PersistentStore store} instances. + * Note that implementors have liberty to cache previous {@link TransientStore store} instances. * * @param config store configuration * @param <V> store value type diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index f24f9aaef58..f4b83736739 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -25,11 +25,12 @@ import java.net.URL; import java.net.URLClassLoader; import java.net.URLConnection; +import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; @@ -64,7 +65,6 @@ import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.exec.store.sys.store.DataChangeVersion; import org.apache.drill.exec.util.JarUtil; import org.apache.hadoop.fs.FileSystem; @@ -83,7 +83,7 @@ private final Path localUdfDir; private boolean deleteTmpDir = false; private File tmpDir; - private List<PluggableFunctionRegistry> pluggableFuncRegistries = Lists.newArrayList(); + private List<PluggableFunctionRegistry> pluggableFuncRegistries = new ArrayList<>(); private OptionSet optionManager; private final boolean useDynamicUdfs; @@ -168,7 +168,7 @@ public void register(DrillOperatorTable operatorTable) { */ @Override public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) { - AtomicLong version = new AtomicLong(); + AtomicInteger version = new AtomicInteger(); String newFunctionName = functionReplacement(functionCall); // Dynamic UDFS: First try with exact match. If not found, we may need to @@ -246,7 +246,7 @@ private DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType, boolean retry) { - AtomicLong version = new AtomicLong(); + AtomicInteger version = new AtomicInteger(); for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) { if (h.matches(returnType, argTypes)) { return h; @@ -321,19 +321,19 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() { /** * Purpose of this method is to synchronize remote and local function registries if needed * and to inform if function registry was changed after given version. - * + * <p/> * To make synchronization as much light-weigh as possible, first only versions of both registries are checked * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars. * The need of synchronization is checked again (double-check lock) before comparing jars. * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}. * Once jar download is finished, all missing jars are registered in one batch. * In case if any errors during jars download / registration, these errors are logged. - * + * <p/> * During registration local function registry is updated with remote function registry version it is synced with. * When at least one jar of the missing jars failed to download / register, * local function registry version are not updated but jars that where successfully downloaded / registered * are added to local function registry. - * + * <p/> * If synchronization between remote and local function registry was not needed, * checks if given registry version matches latest sync version * to inform if function registry was changed after given version. @@ -342,16 +342,16 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() { * @return true if remote and local function registries were synchronized after given version */ @SuppressWarnings("resource") - public boolean syncWithRemoteRegistry(long version) { + public boolean syncWithRemoteRegistry(int version) { // Do the version check only if a remote registry exists. It does // not exist for some JMockit-based unit tests. if (isRegistrySyncNeeded()) { synchronized (this) { - long localRegistryVersion = localFunctionRegistry.getVersion(); + int localRegistryVersion = localFunctionRegistry.getVersion(); if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion)) { DataChangeVersion remoteVersion = new DataChangeVersion(); List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion); - List<JarScan> jars = Lists.newArrayList(); + List<JarScan> jars = new ArrayList<>(); if (!missingJars.isEmpty()) { logger.info("Starting dynamic UDFs lazy-init process.\n" + "The following jars are going to be downloaded and registered locally: " + missingJars); @@ -381,7 +381,7 @@ public boolean syncWithRemoteRegistry(long version) { } } } - long latestRegistryVersion = jars.size() != missingJars.size() ? + int latestRegistryVersion = jars.size() != missingJars.size() ? localRegistryVersion : remoteVersion.getVersion(); localFunctionRegistry.register(jars, latestRegistryVersion); return true; @@ -392,23 +392,38 @@ public boolean syncWithRemoteRegistry(long version) { return version != localFunctionRegistry.getVersion(); } + /** + * Checks if remote and local registries should be synchronized. + * Before comparing versions, checks if remote function registry is actually exists. + * + * @return true is local registry should be refreshed, false otherwise + */ private boolean isRegistrySyncNeeded() { + logger.trace("Has remote function registry: {}", remoteFunctionRegistry.hasRegistry()); return remoteFunctionRegistry.hasRegistry() && isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion()); } /** * Checks if local function registry should be synchronized with remote function registry. - * If remote function registry version is -1, it means that remote function registry is unreachable - * or is not configured thus we skip synchronization and return false. - * In all other cases synchronization is needed if remote and local function registries versions do not match. + * + * <ul>If remote function registry version is {@link DataChangeVersion#UNDEFINED}, + * it means that remote function registry does not support versioning + * thus we need to synchronize both registries.</ul> + * <ul>If remote function registry version is {@link DataChangeVersion#NOT_AVAILABLE}, + * it means that remote function registry is unreachable + * or is not configured thus we skip synchronization and return false.</ul> + * <ul>For all other cases synchronization is needed if remote + * and local function registries versions do not match.</ul> * * @param remoteVersion remote function registry version * @param localVersion local function registry version * @return true is local registry should be refreshed, false otherwise */ - private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) { - return remoteVersion != -1 && remoteVersion != localVersion; + private boolean isRegistrySyncNeeded(int remoteVersion, int localVersion) { + logger.trace("Compare remote [{}] and local [{}] registry versions.", remoteVersion, localVersion); + return remoteVersion == DataChangeVersion.UNDEFINED || + (remoteVersion != DataChangeVersion.NOT_AVAILABLE && remoteVersion != localVersion); } /** @@ -459,7 +474,7 @@ private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws I DataChangeVersion version) { List<Jar> remoteJars = remoteFunctionRegistry.getRegistry(version).getJarList(); List<String> localJars = localFunctionRegistry.getAllJarNames(); - List<String> missingJars = Lists.newArrayList(); + List<String> missingJars = new ArrayList<>(); for (Jar jar : remoteJars) { if (!localJars.contains(jar.getName())) { missingJars.add(jar.getName()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java index dc8fd74ffbc..d1d4fc94dfd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java @@ -19,18 +19,18 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; -import org.apache.drill.shaded.guava.com.google.common.collect.Queues; import org.apache.drill.common.AutoCloseables.Closeable; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.expr.fn.DrillFuncHolder; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Queue; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -45,9 +45,10 @@ * Holder is designed to allow concurrent reads and single writes to keep data consistent. * This is achieved by {@link ReadWriteLock} implementation usage. * Holder has number version which indicates remote function registry version number it is in sync with. - * + * <p/> * Structure example: * + * <pre> * JARS * built-in -> upper -> upper(VARCHAR-REQUIRED) * -> lower -> lower(VARCHAR-REQUIRED) @@ -72,12 +73,12 @@ * * custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for custom_lower(VARCHAR-REQUIRED) * -> custom_lower(VARCHAR-OPTIONAL) -> function holder for custom_lower(VARCHAR-OPTIONAL) - * + * </pre> * where - * First.jar is jar name represented by String - * upper is function name represented by String - * upper(VARCHAR-REQUIRED) is signature name represented by String which consist of function name, list of input parameters - * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder} initiated for each function. + * <li><b>First.jar</b> is jar name represented by {@link String}.</li> + * <li><b>upper</b> is function name represented by {@link String}.</li> + * <li><b>upper(VARCHAR-REQUIRED)</b> is signature name represented by String which consist of function name, list of input parameters.</li> + * <li><b>function holder for upper(VARCHAR-REQUIRED)</b> is {@link DrillFuncHolder} initiated for each function.</li> * */ public class FunctionRegistryHolder { @@ -88,7 +89,7 @@ private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); // remote function registry number, it is in sync with - private long version; + private int version; // jar name, Map<function name, Queue<function signature> private final Map<String, Map<String, Queue<String>>> jars; @@ -97,15 +98,15 @@ private final Map<String, Map<String, DrillFuncHolder>> functions; public FunctionRegistryHolder() { - this.functions = Maps.newConcurrentMap(); - this.jars = Maps.newConcurrentMap(); + this.functions = new ConcurrentHashMap<>(); + this.jars = new ConcurrentHashMap<>(); } /** * This is read operation, so several users at a time can get this data. * @return local function registry version number */ - public long getVersion() { + public int getVersion() { try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { return version; } @@ -122,12 +123,12 @@ public long getVersion() { * * @param newJars jars and list of their function holders, each contains function name, signature and holder */ - public void addJars(Map<String, List<FunctionHolder>> newJars, long version) { + public void addJars(Map<String, List<FunctionHolder>> newJars, int version) { try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) { String jarName = newJar.getKey(); removeAllByJar(jarName); - Map<String, Queue<String>> jar = Maps.newConcurrentMap(); + Map<String, Queue<String>> jar = new ConcurrentHashMap<>(); jars.put(jarName, jar); addFunctions(jar, newJar.getValue()); } @@ -156,7 +157,7 @@ public void removeJar(String jarName) { */ public List<String> getAllJarNames() { try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { - return Lists.newArrayList(jars.keySet()); + return new ArrayList<>(jars.keySet()); } } @@ -171,7 +172,7 @@ public void removeJar(String jarName) { public List<String> getFunctionNamesByJar(String jarName) { try (@SuppressWarnings("unused") Closeable lock = readLock.open()){ Map<String, Queue<String>> functions = jars.get(jarName); - return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet()); + return functions == null ? new ArrayList<>() : new ArrayList<>(functions.keySet()); } } @@ -185,14 +186,14 @@ public void removeJar(String jarName) { * @param version version holder * @return all functions which their holders */ - public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) { + public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicInteger version) { try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { if (version != null) { version.set(this.version); } ListMultimap<String, DrillFuncHolder> functionsWithHolders = ArrayListMultimap.create(); for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) { - functionsWithHolders.putAll(function.getKey(), Lists.newArrayList(function.getValue().values())); + functionsWithHolders.putAll(function.getKey(), new ArrayList<>(function.getValue().values())); } return functionsWithHolders; } @@ -220,7 +221,7 @@ public void removeJar(String jarName) { try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create(); for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) { - functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet())); + functionsWithSignatures.putAll(function.getKey(), new ArrayList<>(function.getValue().keySet())); } return functionsWithSignatures; } @@ -236,13 +237,13 @@ public void removeJar(String jarName) { * @param version version holder * @return list of function holders */ - public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) { + public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicInteger version) { try (@SuppressWarnings("unused") Closeable lock = readLock.open()) { if (version != null) { version.set(this.version); } Map<String, DrillFuncHolder> holders = functions.get(functionName); - return holders == null ? Lists.<DrillFuncHolder>newArrayList() : Lists.newArrayList(holders.values()); + return holders == null ? new ArrayList<>() : new ArrayList<>(holders.values()); } } @@ -316,17 +317,13 @@ private void addFunctions(Map<String, Queue<String>> jar, List<FunctionHolder> n final String functionName = function.getName(); Queue<String> jarFunctions = jar.get(functionName); if (jarFunctions == null) { - jarFunctions = Queues.newConcurrentLinkedQueue(); + jarFunctions = new ConcurrentLinkedQueue<>(); jar.put(functionName, jarFunctions); } final String functionSignature = function.getSignature(); jarFunctions.add(functionSignature); - Map<String, DrillFuncHolder> signatures = functions.get(functionName); - if (signatures == null) { - signatures = Maps.newConcurrentMap(); - functions.put(functionName, signatures); - } + Map<String, DrillFuncHolder> signatures = functions.computeIfAbsent(functionName, k -> new ConcurrentHashMap<>()); signatures.put(functionSignature, function.getHolder()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java index 3740a6cc85d..cefbd8cf388 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java @@ -24,8 +24,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -77,13 +78,16 @@ private final FunctionRegistryHolder registryHolder; /** - * Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in. - * Built-in functions are not allowed to be unregistered. Initially sync registry version will be set to 0. + * Registers all functions present in Drill classpath on start-up. + * All functions will be marked as built-in. Built-in functions are not allowed to be unregistered. + * Since local function registry version is based on remote function registry version, + * initially sync version will be set to {@link DataChangeVersion#UNDEFINED} + * to ensure that upon first check both registries would be synchronized. */ public LocalFunctionRegistry(ScanResult classpathScan) { registryHolder = new FunctionRegistryHolder(); validate(BUILT_IN, classpathScan); - register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), 0); + register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), DataChangeVersion.UNDEFINED); if (logger.isTraceEnabled()) { StringBuilder allFunctions = new StringBuilder(); for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) { @@ -96,7 +100,7 @@ public LocalFunctionRegistry(ScanResult classpathScan) { /** * @return remote function registry version number with which local function registry is synced */ - public long getVersion() { + public int getVersion() { return registryHolder.getVersion(); } @@ -160,7 +164,7 @@ public long getVersion() { * @param jars list of jars to be registered * @param version remote function registry version number with which local function registry is synced */ - public void register(List<JarScan> jars, long version) { + public void register(List<JarScan> jars, int version) { Map<String, List<FunctionHolder>> newJars = new HashMap<>(); for (JarScan jarScan : jars) { FunctionConverter converter = new FunctionConverter(); @@ -219,7 +223,7 @@ public int size(){ * @param name function name * @return all function holders associated with the function name. Function name is case insensitive. */ - public List<DrillFuncHolder> getMethods(String name, AtomicLong version) { + public List<DrillFuncHolder> getMethods(String name, AtomicInteger version) { return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version); } @@ -238,7 +242,7 @@ public int size(){ * @param operatorTable drill operator table */ public void register(DrillOperatorTable operatorTable) { - AtomicLong versionHolder = new AtomicLong(); + AtomicInteger versionHolder = new AtomicInteger(); final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap(); operatorTable.setFunctionRegistryVersion(versionHolder.get()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java index 4e947656f3a..f727a9374eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java @@ -54,34 +54,36 @@ * Creates all remote registry areas at startup and validates them, * during init establishes connections with three udf related stores. * Provides tools to work with three udf related stores, gives access to remote registry areas. - * + * <p/> * There are three udf stores: - * REGISTRY - persistent store, stores remote function registry {@link Registry} under udf path + * + * <li><b>REGISTRY</b> - persistent store, stores remote function registry {@link Registry} under udf path * which contains information about all dynamically registered jars and their function signatures. - * If connection is created for the first time, puts empty remote registry. + * If connection is created for the first time, puts empty remote registry.</li> * - * UNREGISTRATION - transient store, stores information under udf/unregister path. + * <li><b>UNREGISTRATION</b> - transient store, stores information under udf/unregister path. * udf/unregister path is persistent by itself but any child created will be transient. * Whenever user submits request to unregister jar, child path with jar name is created under this store. * This store also holds unregistration listener, which notifies all drill bits when child path is created, - * so they can start local unregistration process. + * so they can start local unregistration process.</li> * - * JARS - transient store, stores information under udf/jars path. + * <li><b>JARS</b> - transient store, stores information under udf/jars path. * udf/jars path is persistent by itself but any child created will be transient. * Servers as lock, not allowing to perform any action on the same time. * There two types of actions: {@link Action#REGISTRATION} and {@link Action#UNREGISTRATION}. * Before starting any action, users tries to create child path with jar name under this store * and if such path already exists, receives action being performed on that very jar. - * When user finishes its action, he deletes child path with jar name. - * + * When user finishes its action, he deletes child path with jar name.</li> + * <p/> * There are three udf areas: - * STAGING - area where user copies binary and source jars before starting registration process. - * REGISTRY - area where registered jars are stored. - * TMP - area where source and binary jars are backed up in unique folder during registration process. + * + * <li><b>STAGING</b> - area where user copies binary and source jars before starting registration process.</li> + * <li><b>REGISTRY</b> - area where registered jars are stored.</li> + * <li><b>TMP</b> - area where source and binary jars are backed up in unique folder during registration process.</li> */ public class RemoteFunctionRegistry implements AutoCloseable { - private static final String registry_path = "registry"; + private static final String REGISTRY_PATH = "registry"; private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class); private static final ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT); @@ -112,19 +114,19 @@ public void init(DrillConfig config, PersistentStoreProvider storeProvider, Clus * * @return remote function registry version if any, -1 otherwise */ - public long getRegistryVersion() { + public int getRegistryVersion() { DataChangeVersion version = new DataChangeVersion(); boolean contains = false; try { - contains = registry.contains(registry_path, version); + contains = registry.contains(REGISTRY_PATH, version); } catch (Exception e) { - logger.error("Problem during trying to access remote function registry [{}]", registry_path, e); + logger.error("Problem during trying to access remote function registry [{}]", REGISTRY_PATH, e); } if (contains) { return version.getVersion(); } else { - logger.error("Remote function registry [{}] is unreachable", registry_path); - return -1; + logger.error("Remote function registry [{}] is unreachable", REGISTRY_PATH); + return DataChangeVersion.NOT_AVAILABLE; } } @@ -137,11 +139,11 @@ public long getRegistryVersion() { public boolean hasRegistry() { return registry != null; } public Registry getRegistry(DataChangeVersion version) { - return registry.get(registry_path, version); + return registry.get(REGISTRY_PATH, version); } public void updateRegistry(Registry registryContent, DataChangeVersion version) throws VersionMismatchException { - registry.put(registry_path, registryContent, version); + registry.put(REGISTRY_PATH, registryContent, version); } public void submitForUnregistration(String jar) { @@ -193,7 +195,8 @@ private void prepareStores(PersistentStoreProvider storeProvider, ClusterCoordin .persist() .build(); registry = storeProvider.getOrCreateVersionedStore(registrationConfig); - registry.putIfAbsent(registry_path, Registry.getDefaultInstance()); + logger.trace("Remote function registry type: {}.", registry.getClass()); + registry.putIfAbsent(REGISTRY_PATH, Registry.getDefaultInstance()); } catch (StoreException e) { throw new DrillRuntimeException("Failure while loading remote registry.", e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java index e1e33098aae..eb79a5a65af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java @@ -54,7 +54,7 @@ private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create(); // indicates remote function registry version based on which drill operator were loaded // is used to define if we need to reload operator table in case remote function registry version has changed - private long functionRegistryVersion; + private int functionRegistryVersion; private final OptionManager systemOptionManager; @@ -70,14 +70,14 @@ public DrillOperatorTable(FunctionImplementationRegistry registry, OptionManager * * @param version registry version */ - public void setFunctionRegistryVersion(long version) { + public void setFunctionRegistryVersion(int version) { functionRegistryVersion = version; } /** * @return function registry version based on which operator table was loaded */ - public long getFunctionRegistryVersion() { + public int getFunctionRegistryVersion() { return functionRegistryVersion; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index e3cd7e460db..41faea96c13 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -21,7 +21,6 @@ import org.apache.calcite.sql.SqlDescribeSchema; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; import org.apache.drill.common.exceptions.UserException; @@ -58,8 +57,7 @@ private DrillSqlWorker() { * @param sql sql query * @return query physical plan */ - public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException, - ForemanSetupException { + public static PhysicalPlan getPlan(QueryContext context, String sql) throws ForemanSetupException { return getPlan(context, sql, null); } @@ -76,15 +74,18 @@ public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlP * @param textPlan text plan * @return query physical plan */ - public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) - throws ForemanSetupException { + public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException { Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value); try { return getQueryPlan(context, sql, textPlan); } catch (Exception e) { + logger.trace("There was an error during conversion into physical plan. " + + "Will sync remote and local function registries if needed and retry " + + "in case if issue was due to missing function implementation."); if (context.getFunctionRegistry().syncWithRemoteRegistry( context.getDrillOperatorTable().getFunctionRegistryVersion())) { context.reloadDrillOperatorTable(); + logger.trace("Local function registry was synchronized with remote. Trying to find function one more time."); return getQueryPlan(context, sql, textPlanCopy); } throw e; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java index 4311f48bd45..ca26a243326 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java @@ -18,24 +18,37 @@ package org.apache.drill.exec.store.sys; import org.apache.drill.exec.exception.StoreException; -import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore; +import org.apache.drill.exec.store.sys.store.UndefinedVersionDelegatingStore; /** * A factory used to create {@link PersistentStore store} instances. - * */ public interface PersistentStoreProvider extends AutoCloseable { + /** * Gets or creates a {@link PersistentStore persistent store} for the given configuration. * * Note that implementors have liberty to cache previous {@link PersistentStore store} instances. * - * @param config store configuration - * @param <V> store value type + * @param config store configuration + * @param <V> store value type + * @return persistent store instance + * @throws StoreException in case when unable to create store */ <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException; + + /** + * Override this method if store supports versioning and return versioning instance. + * By default, undefined version wrapper will be used. + * + * @param config store configuration + * @param <V> store value type + * @return versioned persistent store instance + * @throws StoreException in case when unable to create store + */ default <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException { - return new VersionedDelegatingStore<>(getOrCreateStore(config)); + // for those stores that do not support versioning + return new UndefinedVersionDelegatingStore<>(getOrCreateStore(config)); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java index d182de33174..76e5610def4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java @@ -17,9 +17,17 @@ */ package org.apache.drill.exec.store.sys.store; +/** + * Holder for store version. By default version is {@link DataChangeVersion#UNDEFINED}. + */ public class DataChangeVersion { - private int version; + // is used when store in unreachable + public static final int NOT_AVAILABLE = -1; + // is used when store does not support versioning + public static final int UNDEFINED = -2; + + private int version = UNDEFINED; public void setVersion(int version) { this.version = version; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java new file mode 100644 index 00000000000..5873ec0ac92 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; + +import java.util.Iterator; +import java.util.Map; + +/** + * Wrapper store that delegates operations to PersistentStore. + * Does not keep versioning and returns {@link DataChangeVersion#UNDEFINED} when version is required. + * + * @param <V> store value type + */ +public class UndefinedVersionDelegatingStore<V> implements VersionedPersistentStore<V> { + + private final PersistentStore<V> store; + + public UndefinedVersionDelegatingStore(PersistentStore<V> store) { + this.store = store; + } + + @Override + public boolean contains(String key, DataChangeVersion version) { + version.setVersion(DataChangeVersion.UNDEFINED); + return store.contains(key); + } + + @Override + public V get(String key, DataChangeVersion version) { + version.setVersion(DataChangeVersion.UNDEFINED); + return store.get(key); + } + + @Override + public void put(String key, V value, DataChangeVersion version) { + store.put(key, value); + } + + @Override + public PersistentStoreMode getMode() { + return store.getMode(); + } + + @Override + public void delete(String key) { + store.delete(key); + } + + @Override + public boolean putIfAbsent(String key, V value) { + return store.putIfAbsent(key, value); + } + + @Override + public Iterator<Map.Entry<String, V>> getRange(int skip, int take) { + return store.getRange(skip, take); + } + + @Override + public void close() throws Exception { + store.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java index 18e0b826291..40576d55c71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java @@ -30,22 +30,24 @@ import org.apache.drill.exec.store.sys.VersionedPersistentStore; /** - * Versioned Store that delegates operations to PersistentStore - * @param <V> + * Versioned store that delegates operations to PersistentStore and keeps versioning, + * incrementing version each time write / delete operation is triggered. + * Once created initial version is 0. Can be used only for local versioning, not distributed. + * + * @param <V> store value type */ public class VersionedDelegatingStore<V> implements VersionedPersistentStore<V> { private final PersistentStore<V> store; - private final ReadWriteLock readWriteLock; private final AutoCloseableLock readLock; private final AutoCloseableLock writeLock; private int version; public VersionedDelegatingStore(PersistentStore<V> store) { this.store = store; - readWriteLock = new ReentrantReadWriteLock(); + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readLock = new AutoCloseableLock(readWriteLock.readLock()); writeLock = new AutoCloseableLock(readWriteLock.writeLock()); - version = -1; + version = 0; } @Override @@ -113,7 +115,7 @@ public void close() throws Exception { try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) { store.close(); - version = -1; + version = DataChangeVersion.NOT_AVAILABLE; } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java index aa6ee9d1717..75cef2f472e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java @@ -17,21 +17,23 @@ */ package org.apache.drill.exec.store.sys.store.provider; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; import org.apache.drill.common.AutoCloseables; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.common.util.function.CheckedFunction; public class CachingPersistentStoreProvider extends BasePersistentStoreProvider { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingPersistentStoreProvider.class); - private final ConcurrentMap<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = Maps.newConcurrentMap(); + private final Map<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = new ConcurrentHashMap<>(); + private final Map<PersistentStoreConfig<?>, VersionedPersistentStore<?>> versionedStoreCache = new ConcurrentHashMap<>(); private final PersistentStoreProvider provider; public CachingPersistentStoreProvider(PersistentStoreProvider provider) { @@ -41,21 +43,15 @@ public CachingPersistentStoreProvider(PersistentStoreProvider provider) { @Override @SuppressWarnings("unchecked") public <V> PersistentStore<V> getOrCreateStore(final PersistentStoreConfig<V> config) throws StoreException { - final PersistentStore<?> store = storeCache.get(config); - if (store == null) { - final PersistentStore<?> newStore = provider.getOrCreateStore(config); - final PersistentStore<?> finalStore = storeCache.putIfAbsent(config, newStore); - if (finalStore == null) { - return (PersistentStore<V>)newStore; - } - try { - newStore.close(); - } catch (Exception ex) { - throw new StoreException(ex); - } - } + CheckedFunction<PersistentStoreConfig<?>, PersistentStore<?>, StoreException> function = provider::getOrCreateStore; + return (PersistentStore<V>) storeCache.computeIfAbsent(config, function); + } - return (PersistentStore<V>) store; + @Override + @SuppressWarnings("unchecked") + public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException { + CheckedFunction<PersistentStoreConfig<?>, VersionedPersistentStore<?>, StoreException> function = provider::getOrCreateVersionedStore; + return (VersionedPersistentStore<V>) versionedStoreCache.computeIfAbsent(config, function); } @Override @@ -65,12 +61,19 @@ public void start() throws Exception { @Override public void close() throws Exception { - final List<AutoCloseable> closeables = Lists.newArrayList(); - for (final AutoCloseable store : storeCache.values()) { - closeables.add(store); - } - closeables.add(provider); + List<AutoCloseable> closeables = new ArrayList<>(); + + // add un-versioned stores + closeables.addAll(storeCache.values()); storeCache.clear(); + + // add versioned stores + closeables.addAll(versionedStoreCache.values()); + versionedStoreCache.clear(); + + // add provider + closeables.add(provider); + AutoCloseables.close(closeables); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java index 3ab85ec291e..6a70df775dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java @@ -17,11 +17,12 @@ */ package org.apache.drill.exec.store.sys.store.provider; -import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; import org.apache.drill.exec.store.sys.store.InMemoryStore; +import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore; public class InMemoryStoreProvider implements PersistentStoreProvider { @@ -35,10 +36,15 @@ public InMemoryStoreProvider(int capacity) { public void close() throws Exception { } @Override - public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException { + public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) { return new InMemoryStore<>(capacity); } @Override - public void start() throws Exception { } + public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) { + return new VersionedDelegatingStore<>(getOrCreateStore(config)); + } + + @Override + public void start() { } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java index af6777111b8..2dae62def52 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java @@ -26,7 +26,9 @@ import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.VersionedPersistentStore; import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore; import org.apache.drill.exec.testing.store.NoWriteLocalStore; import org.apache.hadoop.fs.Path; @@ -70,6 +72,10 @@ public LocalPersistentStoreProvider(final DrillConfig config) throws StoreExcept } } + @Override + public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) { + return new VersionedDelegatingStore<>(getOrCreateStore(config)); + } @Override public void close() throws Exception { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java index 8f5252d1e25..978231575ae 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java @@ -20,7 +20,6 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.categories.SqlFunctionTest; import org.apache.drill.exec.expr.fn.DrillFuncHolder; import org.junit.Before; @@ -30,9 +29,11 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -51,7 +52,7 @@ @BeforeClass public static void init() { - newJars = Maps.newHashMap(); + newJars = new HashMap<>(); FunctionHolder lower = new FunctionHolder("lower", "lower(VARCHAR-REQUIRED)", mock(DrillFuncHolder.class)); FunctionHolder upper = new FunctionHolder("upper", "upper(VARCHAR-REQUIRED)", mock(DrillFuncHolder.class)); newJars.put(built_in, Lists.newArrayList(lower, upper)); @@ -69,9 +70,9 @@ public void setup() { @Test public void testVersion() { resetRegistry(); - long expectedVersion = 0; + int expectedVersion = 0; assertEquals("Initial version should be 0", expectedVersion, registryHolder.getVersion()); - registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap(), ++expectedVersion); + registryHolder.addJars(new HashMap<>(), ++expectedVersion); assertEquals("Version can change if no jars were added.", expectedVersion, registryHolder.getVersion()); fillInRegistry(++expectedVersion); assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion()); @@ -87,7 +88,7 @@ public void testVersion() { public void testAddJars() { resetRegistry(); int functionsSize = 0; - List<String> jars = Lists.newArrayList(); + List<String> jars = new ArrayList<>(); ListMultimap<String, DrillFuncHolder> functionsWithHolders = ArrayListMultimap.create(); ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create(); for (Map.Entry<String, List<FunctionHolder>> jar : newJars.entrySet()) { @@ -99,7 +100,7 @@ public void testAddJars() { } } - long expectedVersion = 0; + int expectedVersion = 0; registryHolder.addJars(newJars, ++expectedVersion); assertEquals("Version number should match", expectedVersion, registryHolder.getVersion()); compareTwoLists(jars, registryHolder.getAllJarNames()); @@ -112,7 +113,7 @@ public void testAddJars() { public void testAddTheSameJars() { resetRegistry(); int functionsSize = 0; - List<String> jars = Lists.newArrayList(); + List<String> jars = new ArrayList<>(); ListMultimap<String, DrillFuncHolder> functionsWithHolders = ArrayListMultimap.create(); ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create(); for (Map.Entry<String, List<FunctionHolder>> jar : newJars.entrySet()) { @@ -123,7 +124,7 @@ public void testAddTheSameJars() { functionsSize++; } } - long expectedVersion = 0; + int expectedVersion = 0; registryHolder.addJars(newJars, ++expectedVersion); assertEquals("Version number should match", expectedVersion, registryHolder.getVersion()); compareTwoLists(jars, registryHolder.getAllJarNames()); @@ -150,16 +151,15 @@ public void testRemoveJar() { @Test public void testGetAllJarNames() { - ArrayList<String> expectedResult = Lists.newArrayList(newJars.keySet()); + List<String> expectedResult = new ArrayList<>(newJars.keySet()); compareTwoLists(expectedResult, registryHolder.getAllJarNames()); } @Test public void testGetFunctionNamesByJar() { - ArrayList<String> expectedResult = Lists.newArrayList(); - for (FunctionHolder functionHolder : newJars.get(built_in)) { - expectedResult.add(functionHolder.getName()); - } + List<String> expectedResult = newJars.get(built_in).stream() + .map(FunctionHolder::getName) + .collect(Collectors.toList()); compareTwoLists(expectedResult, registryHolder.getFunctionNamesByJar(built_in)); } @@ -171,7 +171,7 @@ public void testGetAllFunctionsWithHoldersWithVersion() { expectedResult.put(functionHolder.getName(), functionHolder.getHolder()); } } - AtomicLong version = new AtomicLong(); + AtomicInteger version = new AtomicInteger(); compareListMultimaps(expectedResult, registryHolder.getAllFunctionsWithHolders(version)); assertEquals("Version number should match", version.get(), registryHolder.getVersion()); } @@ -200,30 +200,26 @@ public void testGetAllFunctionsWithSignatures() { @Test public void testGetHoldersByFunctionNameWithVersion() { - List<DrillFuncHolder> expectedResult = Lists.newArrayList(); - for (List<FunctionHolder> functionHolders : newJars.values()) { - for (FunctionHolder functionHolder : functionHolders) { - if ("lower".equals(functionHolder.getName())) { - expectedResult.add(functionHolder.getHolder()); - } - } - } + List<DrillFuncHolder> expectedResult = newJars.values().stream() + .flatMap(Collection::stream) + .filter(f -> "lower".equals(f.getName())) + .map(FunctionHolder::getHolder) + .collect(Collectors.toList()); + assertFalse(expectedResult.isEmpty()); - AtomicLong version = new AtomicLong(); + AtomicInteger version = new AtomicInteger(); compareTwoLists(expectedResult, registryHolder.getHoldersByFunctionName("lower", version)); assertEquals("Version number should match", version.get(), registryHolder.getVersion()); } @Test public void testGetHoldersByFunctionName() { - List<DrillFuncHolder> expectedResult = Lists.newArrayList(); - for (List<FunctionHolder> functionHolders : newJars.values()) { - for (FunctionHolder functionHolder : functionHolders) { - if ("lower".equals(functionHolder.getName())) { - expectedResult.add(functionHolder.getHolder()); - } - } - } + List<DrillFuncHolder> expectedResult = newJars.values().stream() + .flatMap(Collection::stream) + .filter(f -> "lower".equals(f.getName())) + .map(FunctionHolder::getHolder) + .collect(Collectors.toList()); + assertFalse(expectedResult.isEmpty()); compareTwoLists(expectedResult, registryHolder.getHoldersByFunctionName("lower")); } @@ -236,10 +232,9 @@ public void testContainsJar() { @Test public void testFunctionsSize() { - int count = 0; - for (List<FunctionHolder> functionHolders : newJars.values()) { - count += functionHolders.size(); - } + int count = newJars.values().stream() + .mapToInt(List::size) + .sum(); assertEquals("Functions size should match", count, registryHolder.functionsSize()); } @@ -256,7 +251,7 @@ private void resetRegistry() { registryHolder = new FunctionRegistryHolder(); } - private void fillInRegistry(long version) { + private void fillInRegistry(int version) { registryHolder.addJars(newJars, version); } @@ -266,7 +261,7 @@ private void fillInRegistry(long version) { assertEquals("Multimaps size should match", m1.size(), m2.size()); for (Map.Entry<String, Collection<T>> entry : m1.entrySet()) { try { - compareTwoLists(Lists.newArrayList(entry.getValue()), Lists.newArrayList(m2.get(entry.getKey()))); + compareTwoLists(new ArrayList<>(entry.getValue()), new ArrayList<>(m2.get(entry.getKey()))); } catch (AssertionError e) { throw new AssertionError("Multimaps values should match", e); } @@ -275,9 +270,7 @@ private void fillInRegistry(long version) { private <T> void compareTwoLists(List<T> l1, List<T> l2) { assertEquals("Lists size should match", l1.size(), l2.size()); - for (T item : l1) { - assertTrue("Two lists should have the same values", l2.contains(item)); - } + l1.forEach(i -> assertTrue("Two lists should have the same values", l2.contains(i))); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java index a5a3c5121aa..16ac42e27e9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java @@ -64,7 +64,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -762,7 +762,7 @@ public void testConcurrentRemoteRegistryUpdateWithDuplicates() throws Exception DataChangeVersion version = new DataChangeVersion(); Registry registry = remoteFunctionRegistry.getRegistry(version); - assertEquals("Remote registry version should match", 1, version.getVersion()); + assertEquals("Remote registry version should match", 2, version.getVersion()); List<Jar> jarList = registry.getJarList(); assertEquals("Only one jar should be registered", 1, jarList.size()); assertEquals("Jar name should match", jar1, jarList.get(0).getName()); @@ -823,7 +823,7 @@ public void testConcurrentRemoteRegistryUpdateForDifferentJars() throws Exceptio DataChangeVersion version = new DataChangeVersion(); Registry registry = remoteFunctionRegistry.getRegistry(version); - assertEquals("Remote registry version should match", 2, version.getVersion()); + assertEquals("Remote registry version should match", 3, version.getVersion()); List<Jar> actualJars = registry.getJarList(); List<String> expectedJars = Lists.newArrayList(jar1, jar2); @@ -861,7 +861,7 @@ public void testLazyInitConcurrent() throws Exception { assertTrue("syncWithRemoteRegistry() should return true", result); return true; }) - .when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong()); + .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt()); SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query); Thread thread1 = new Thread(simpleQueryRunner); @@ -873,10 +873,10 @@ public void testLazyInitConcurrent() throws Exception { thread1.join(); thread2.join(); - verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyLong()); + verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyInt()); LocalFunctionRegistry localFunctionRegistry = (LocalFunctionRegistry)FieldUtils.readField( functionImplementationRegistry, "localFunctionRegistry", true); - assertEquals("Sync function registry version should match", 1L, localFunctionRegistry.getVersion()); + assertEquals("Sync function registry version should match", 2, localFunctionRegistry.getVersion()); } @Test @@ -895,7 +895,7 @@ public void testLazyInitNoReload() throws Exception { assertFalse("syncWithRemoteRegistry() should return false", result); return false; }) - .when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong()); + .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt()); test("select custom_lower('A') from (values(1))"); @@ -906,10 +906,10 @@ public void testLazyInitNoReload() throws Exception { assertThat(e.getMessage(), containsString("No match found for function signature unknown_lower(<CHARACTER>)")); } - verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyLong()); + verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyInt()); LocalFunctionRegistry localFunctionRegistry = (LocalFunctionRegistry)FieldUtils.readField( functionImplementationRegistry, "localFunctionRegistry", true); - assertEquals("Sync function registry version should match", 1L, localFunctionRegistry.getVersion()); + assertEquals("Sync function registry version should match", 2, localFunctionRegistry.getVersion()); } private static String buildJars(String jarName, String includeFiles, String includeResources) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java index 1709bdf501a..c71a2d46168 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java @@ -19,7 +19,7 @@ import ch.qos.logback.classic.Level; import org.apache.drill.exec.client.LoggingResultsListener; -import org.apache.drill.exec.util.CheckedSupplier; +import org.apache.drill.common.util.function.CheckedSupplier; import org.apache.drill.exec.util.VectorUtil; import java.util.function.Supplier; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Dynamic UDFs registered on one Drillbit are not visible on other Drillbits > -------------------------------------------------------------------------- > > Key: DRILL-6762 > URL: https://issues.apache.org/jira/browse/DRILL-6762 > Project: Apache Drill > Issue Type: Bug > Components: Functions - Drill > Affects Versions: 1.13.0 > Reporter: Kunal Khatua > Assignee: Arina Ielchiieva > Priority: Critical > Labels: ready-to-commit > Fix For: 1.15.0 > > Attachments: Dynamic UDFs issue.pdf > > > Originally Reported : > https://stackoverflow.com/questions/52480160/dynamic-udf-in-apache-drill-cluster > When using a 4-node Drill 1.14 cluster, UDF jars registered on one node are > not usable on other nodes despite the {{/registry}} and ZK showing the UDFs > as registered. > This was previously working on 1.13.0 > *Root cause* > 1. {{VersionedDelegatingStore}} was starting with version -1 and local > function registry with version 0. This caused issues when > {{LocalPersistentStore}} already existed on the file system. When adding jars > into remote registry its versioned was bumped to 0 and synchronization did > not happen since local registry had the same version. > *Fix*: start {{VersionedDelegatingStore}} with version 0, local function > registry with undefined version (-2) thus first sync will always happen. > 2. {{PersistentStoreProvider.getOrCreateVersionedStore}} was wrapping stores > into VersionedDelegatingStore for those store providers that did not override > this method. Only Zookeeper store was overriding it. But > {{VersionedDelegatingStore}} is only keeps versioning locally and thus can be > applied only for local stores, i.e. Hbase, Mongo cannot use it. > {{CachingPersistentStoreProvider}} did not override > {{getOrCreateVersionedStore}} either. Mostly all stores in Drill are created > using {{CachingPersistentStoreProvider}}. Thus all stores where wrapped into > {{VersionedDelegatingStore}}, even Zookeeper one which caused function > registries version synchronization issues. > *Fix*: Add {{UndefinedVersionDelegatingStore}} for those stores that do not > support versioning and wrap into it by default in > {{PersistentStoreProvider.getOrCreateVersionedStore}} if this method is not > overriden. {{UndefinedVersionDelegatingStore}} will return UNDEFINED version > (-2). During sync between remote and local registries if remote registry has > UNDEFINED version sync will be done immediately, on the contrary with > NOT_AVAILABLE version (-1) which indicates that remote function registry is > not accessible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)