[ 
https://issues.apache.org/jira/browse/DRILL-6762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641787#comment-16641787
 ] 

ASF GitHub Bot commented on DRILL-6762:
---------------------------------------

asfgit closed pull request #1484: DRILL-6762: Fix dynamic UDFs versioning issue
URL: https://github.com/apache/drill/pull/1484
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java 
b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
index 2a331bb9107..d2dc0f8a750 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/ErrorHelper.java
@@ -181,4 +181,17 @@ static UserException findWrappedUserException(Throwable 
ex) {
     return (UserException) cause;
   }
 
+  /**
+   * Helps to hide checked exception from the compiler but then actually throw 
it.
+   * Is useful when implementing functional interfaces that allow checked 
exceptions.
+   *
+   * @param e original exception instance
+   * @param <E> exception type
+   * @throws E exception instance
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Throwable> void sneakyThrow(Throwable e) throws E {
+    throw (E) e;
+  }
+
 }
diff --git 
a/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java
 
b/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java
new file mode 100644
index 00000000000..050565e7a59
--- /dev/null
+++ 
b/common/src/main/java/org/apache/drill/common/util/function/CheckedFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util.function;
+
+import java.util.function.Function;
+
+import static org.apache.drill.common.exceptions.ErrorHelper.sneakyThrow;
+
+/**
+ * Extension of {@link Function} that allows to throw checked exception.
+ *
+ * @param <T> function argument type
+ * @param <R> function result type
+ * @param <E> exception type
+ */
+@FunctionalInterface
+public interface CheckedFunction<T, R, E extends Throwable> extends 
Function<T, R> {
+
+  /**
+   * Overrides {@link Function#apply(Object)} method to allow calling 
functions that throw checked exceptions.
+   * Is useful when used in methods that accept {@link Function}.
+   * For example: {@link java.util.Map#computeIfAbsent(Object, Function)}.
+   *
+   * @param t the function argument
+   * @return the function result
+   */
+  @Override
+  default R apply(T t) {
+    try {
+      return applyAndThrow(t);
+    } catch (Throwable e) {
+      sneakyThrow(e);
+    }
+    // should never happen
+    throw new RuntimeException();
+  }
+
+  /**
+   * Applies function to the given argument.
+   *
+   * @param t the function argument
+   * @return the function result
+   * @throws E exception in case of errors
+   */
+  R applyAndThrow(T t) throws E;
+
+}
+
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java 
b/common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java
similarity index 92%
rename from 
exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
rename to 
common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java
index b744ac8c033..60633839945 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
+++ 
b/common/src/main/java/org/apache/drill/common/util/function/CheckedSupplier.java
@@ -15,10 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.util;
+package org.apache.drill.common.util.function;
 
 /**
- * The java standard library does not provide a lambda function interface for 
funtions that take no arguments,
+ * The java standard library does not provide a lambda function interface for 
functions that take no arguments,
  * but that throw an exception. So, we have to define our own here.
  * @param <T> The return type of the lambda function.
  * @param <E> The type of exception thrown by the lambda function.
diff --git 
a/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java
 
b/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java
new file mode 100644
index 00000000000..a1ab389836e
--- /dev/null
+++ 
b/common/src/test/java/org/apache/drill/common/util/function/TestCheckedFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util.function;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestCheckedFunction {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testComputeIfAbsentWithCheckedFunction() {
+    ExceptionProducer producer = new ExceptionProducer();
+    Map<String, String> map = new HashMap<>();
+    String message = "Exception message";
+    CheckedFunction<String, String, Exception> function = 
producer::failWithMessage;
+
+    thrown.expect(Exception.class);
+    thrown.expectMessage(message);
+
+    map.computeIfAbsent(message, function);
+  }
+
+  private class ExceptionProducer {
+
+    String failWithMessage(String message) throws Exception {
+      throw new Exception(message);
+    }
+
+  }
+
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
index 135ccd4e1bb..35feaa9f282 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
@@ -25,7 +25,7 @@
   /**
    * Returns a {@link TransientStore transient store} instance for the given 
configuration.
    *
-   * Note that implementors have liberty to cache previous {@link 
PersistentStore store} instances.
+   * Note that implementors have liberty to cache previous {@link 
TransientStore store} instances.
    *
    * @param config  store configuration
    * @param <V>  store value type
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index f24f9aaef58..f4b83736739 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -25,11 +25,12 @@
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.net.URLConnection;
+import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -64,7 +65,6 @@
 
 import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.exec.store.sys.store.DataChangeVersion;
 import org.apache.drill.exec.util.JarUtil;
 import org.apache.hadoop.fs.FileSystem;
@@ -83,7 +83,7 @@
   private final Path localUdfDir;
   private boolean deleteTmpDir = false;
   private File tmpDir;
-  private List<PluggableFunctionRegistry> pluggableFuncRegistries = 
Lists.newArrayList();
+  private List<PluggableFunctionRegistry> pluggableFuncRegistries = new 
ArrayList<>();
   private OptionSet optionManager;
   private final boolean useDynamicUdfs;
 
@@ -168,7 +168,7 @@ public void register(DrillOperatorTable operatorTable) {
    */
   @Override
   public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, 
FunctionCall functionCall) {
-    AtomicLong version = new AtomicLong();
+    AtomicInteger version = new AtomicInteger();
     String newFunctionName = functionReplacement(functionCall);
 
     // Dynamic UDFS: First try with exact match. If not found, we may need to
@@ -246,7 +246,7 @@ private DrillFuncHolder 
findExactMatchingDrillFunction(String name,
                                                          List<MajorType> 
argTypes,
                                                          MajorType returnType,
                                                          boolean retry) {
-    AtomicLong version = new AtomicLong();
+    AtomicInteger version = new AtomicInteger();
     for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
       if (h.matches(returnType, argTypes)) {
         return h;
@@ -321,19 +321,19 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() 
{
   /**
    * Purpose of this method is to synchronize remote and local function 
registries if needed
    * and to inform if function registry was changed after given version.
-   *
+   * <p/>
    * To make synchronization as much light-weigh as possible, first only 
versions of both registries are checked
    * without any locking. If synchronization is needed, enters synchronized 
block to prevent others loading the same jars.
    * The need of synchronization is checked again (double-check lock) before 
comparing jars.
    * If any missing jars are found, they are downloaded to local udf area, 
each is wrapped into {@link JarScan}.
    * Once jar download is finished, all missing jars are registered in one 
batch.
    * In case if any errors during jars download / registration, these errors 
are logged.
-   *
+   * <p/>
    * During registration local function registry is updated with remote 
function registry version it is synced with.
    * When at least one jar of the missing jars failed to download / register,
    * local function registry version are not updated but jars that where 
successfully downloaded / registered
    * are added to local function registry.
-   *
+   * <p/>
    * If synchronization between remote and local function registry was not 
needed,
    * checks if given registry version matches latest sync version
    * to inform if function registry was changed after given version.
@@ -342,16 +342,16 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() 
{
    * @return true if remote and local function registries were synchronized 
after given version
    */
   @SuppressWarnings("resource")
-  public boolean syncWithRemoteRegistry(long version) {
+  public boolean syncWithRemoteRegistry(int version) {
     // Do the version check only if a remote registry exists. It does
     // not exist for some JMockit-based unit tests.
     if (isRegistrySyncNeeded()) {
       synchronized (this) {
-        long localRegistryVersion = localFunctionRegistry.getVersion();
+        int localRegistryVersion = localFunctionRegistry.getVersion();
         if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), 
localRegistryVersion))  {
           DataChangeVersion remoteVersion = new DataChangeVersion();
           List<String> missingJars = 
getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, 
remoteVersion);
-          List<JarScan> jars = Lists.newArrayList();
+          List<JarScan> jars = new ArrayList<>();
           if (!missingJars.isEmpty()) {
             logger.info("Starting dynamic UDFs lazy-init process.\n" +
                 "The following jars are going to be downloaded and registered 
locally: " + missingJars);
@@ -381,7 +381,7 @@ public boolean syncWithRemoteRegistry(long version) {
               }
             }
           }
-          long latestRegistryVersion = jars.size() != missingJars.size() ?
+          int latestRegistryVersion = jars.size() != missingJars.size() ?
               localRegistryVersion : remoteVersion.getVersion();
           localFunctionRegistry.register(jars, latestRegistryVersion);
           return true;
@@ -392,23 +392,38 @@ public boolean syncWithRemoteRegistry(long version) {
     return version != localFunctionRegistry.getVersion();
   }
 
+  /**
+   * Checks if remote and local registries should be synchronized.
+   * Before comparing versions, checks if remote function registry is actually 
exists.
+   *
+   * @return true is local registry should be refreshed, false otherwise
+   */
   private boolean isRegistrySyncNeeded() {
+    logger.trace("Has remote function registry: {}", 
remoteFunctionRegistry.hasRegistry());
     return remoteFunctionRegistry.hasRegistry() &&
            isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), 
localFunctionRegistry.getVersion());
   }
 
   /**
    * Checks if local function registry should be synchronized with remote 
function registry.
-   * If remote function registry version is -1, it means that remote function 
registry is unreachable
-   * or is not configured thus we skip synchronization and return false.
-   * In all other cases synchronization is needed if remote and local function 
registries versions do not match.
+   *
+   * <ul>If remote function registry version is {@link 
DataChangeVersion#UNDEFINED},
+   * it means that remote function registry does not support versioning
+   * thus we need to synchronize both registries.</ul>
+   * <ul>If remote function registry version is {@link 
DataChangeVersion#NOT_AVAILABLE},
+   * it means that remote function registry is unreachable
+   * or is not configured thus we skip synchronization and return false.</ul>
+   * <ul>For all other cases synchronization is needed if remote
+   * and local function registries versions do not match.</ul>
    *
    * @param remoteVersion remote function registry version
    * @param localVersion local function registry version
    * @return true is local registry should be refreshed, false otherwise
    */
-  private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) {
-    return remoteVersion != -1 && remoteVersion != localVersion;
+  private boolean isRegistrySyncNeeded(int remoteVersion, int localVersion) {
+    logger.trace("Compare remote [{}] and local [{}] registry versions.", 
remoteVersion, localVersion);
+    return remoteVersion == DataChangeVersion.UNDEFINED ||
+        (remoteVersion != DataChangeVersion.NOT_AVAILABLE && remoteVersion != 
localVersion);
   }
 
   /**
@@ -459,7 +474,7 @@ private ScanResult scan(ClassLoader classLoader, Path path, 
URL[] urls) throws I
                                       DataChangeVersion version) {
     List<Jar> remoteJars = 
remoteFunctionRegistry.getRegistry(version).getJarList();
     List<String> localJars = localFunctionRegistry.getAllJarNames();
-    List<String> missingJars = Lists.newArrayList();
+    List<String> missingJars = new ArrayList<>();
     for (Jar jar : remoteJars) {
       if (!localJars.contains(jar.getName())) {
         missingJars.add(jar.getName());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index dc8fd74ffbc..d1d4fc94dfd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -19,18 +19,18 @@
 
 import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
 
 import org.apache.drill.common.AutoCloseables.Closeable;
 import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -45,9 +45,10 @@
  * Holder is designed to allow concurrent reads and single writes to keep data 
consistent.
  * This is achieved by {@link ReadWriteLock} implementation usage.
  * Holder has number version which indicates remote function registry version 
number it is in sync with.
- *
+ * <p/>
  * Structure example:
  *
+ * <pre>
  * JARS
  * built-in   -> upper          -> upper(VARCHAR-REQUIRED)
  *            -> lower          -> lower(VARCHAR-REQUIRED)
@@ -72,12 +73,12 @@
  *
  * custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for 
custom_lower(VARCHAR-REQUIRED)
  *              -> custom_lower(VARCHAR-OPTIONAL) -> function holder for 
custom_lower(VARCHAR-OPTIONAL)
- *
+ * </pre>
  * where
- * First.jar is jar name represented by String
- * upper is function name represented by String
- * upper(VARCHAR-REQUIRED) is signature name represented by String which 
consist of function name, list of input parameters
- * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder} 
initiated for each function.
+ * <li><b>First.jar</b> is jar name represented by {@link String}.</li>
+ * <li><b>upper</b> is function name represented by {@link String}.</li>
+ * <li><b>upper(VARCHAR-REQUIRED)</b> is signature name represented by String 
which consist of function name, list of input parameters.</li>
+ * <li><b>function holder for upper(VARCHAR-REQUIRED)</b> is {@link 
DrillFuncHolder} initiated for each function.</li>
  *
  */
 public class FunctionRegistryHolder {
@@ -88,7 +89,7 @@
   private final AutoCloseableLock readLock = new 
AutoCloseableLock(readWriteLock.readLock());
   private final AutoCloseableLock writeLock = new 
AutoCloseableLock(readWriteLock.writeLock());
   // remote function registry number, it is in sync with
-  private long version;
+  private int version;
 
   // jar name, Map<function name, Queue<function signature>
   private final Map<String, Map<String, Queue<String>>> jars;
@@ -97,15 +98,15 @@
   private final Map<String, Map<String, DrillFuncHolder>> functions;
 
   public FunctionRegistryHolder() {
-    this.functions = Maps.newConcurrentMap();
-    this.jars = Maps.newConcurrentMap();
+    this.functions = new ConcurrentHashMap<>();
+    this.jars = new ConcurrentHashMap<>();
   }
 
   /**
    * This is read operation, so several users at a time can get this data.
    * @return local function registry version number
    */
-  public long getVersion() {
+  public int getVersion() {
     try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       return version;
     }
@@ -122,12 +123,12 @@ public long getVersion() {
    *
    * @param newJars jars and list of their function holders, each contains 
function name, signature and holder
    */
-  public void addJars(Map<String, List<FunctionHolder>> newJars, long version) 
{
+  public void addJars(Map<String, List<FunctionHolder>> newJars, int version) {
     try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
       for (Map.Entry<String, List<FunctionHolder>> newJar : 
newJars.entrySet()) {
         String jarName = newJar.getKey();
         removeAllByJar(jarName);
-        Map<String, Queue<String>> jar = Maps.newConcurrentMap();
+        Map<String, Queue<String>> jar = new ConcurrentHashMap<>();
         jars.put(jarName, jar);
         addFunctions(jar, newJar.getValue());
       }
@@ -156,7 +157,7 @@ public void removeJar(String jarName) {
    */
   public List<String> getAllJarNames() {
     try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
-      return Lists.newArrayList(jars.keySet());
+      return new ArrayList<>(jars.keySet());
     }
   }
 
@@ -171,7 +172,7 @@ public void removeJar(String jarName) {
   public List<String> getFunctionNamesByJar(String jarName) {
     try (@SuppressWarnings("unused") Closeable lock = readLock.open()){
       Map<String, Queue<String>> functions = jars.get(jarName);
-      return functions == null ? Lists.<String>newArrayList() : 
Lists.newArrayList(functions.keySet());
+      return functions == null ? new ArrayList<>() : new 
ArrayList<>(functions.keySet());
     }
   }
 
@@ -185,14 +186,14 @@ public void removeJar(String jarName) {
    * @param version version holder
    * @return all functions which their holders
    */
-  public ListMultimap<String, DrillFuncHolder> 
getAllFunctionsWithHolders(AtomicLong version) {
+  public ListMultimap<String, DrillFuncHolder> 
getAllFunctionsWithHolders(AtomicInteger version) {
     try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       if (version != null) {
         version.set(this.version);
       }
       ListMultimap<String, DrillFuncHolder> functionsWithHolders = 
ArrayListMultimap.create();
       for (Map.Entry<String, Map<String, DrillFuncHolder>> function : 
functions.entrySet()) {
-        functionsWithHolders.putAll(function.getKey(), 
Lists.newArrayList(function.getValue().values()));
+        functionsWithHolders.putAll(function.getKey(), new 
ArrayList<>(function.getValue().values()));
       }
       return functionsWithHolders;
     }
@@ -220,7 +221,7 @@ public void removeJar(String jarName) {
     try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       ListMultimap<String, String> functionsWithSignatures = 
ArrayListMultimap.create();
       for (Map.Entry<String, Map<String, DrillFuncHolder>> function : 
functions.entrySet()) {
-        functionsWithSignatures.putAll(function.getKey(), 
Lists.newArrayList(function.getValue().keySet()));
+        functionsWithSignatures.putAll(function.getKey(), new 
ArrayList<>(function.getValue().keySet()));
       }
       return functionsWithSignatures;
     }
@@ -236,13 +237,13 @@ public void removeJar(String jarName) {
    * @param version version holder
    * @return list of function holders
    */
-  public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, 
AtomicLong version) {
+  public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, 
AtomicInteger version) {
     try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
       if (version != null) {
         version.set(this.version);
       }
       Map<String, DrillFuncHolder> holders = functions.get(functionName);
-      return holders == null ? Lists.<DrillFuncHolder>newArrayList() : 
Lists.newArrayList(holders.values());
+      return holders == null ? new ArrayList<>() : new 
ArrayList<>(holders.values());
     }
   }
 
@@ -316,17 +317,13 @@ private void addFunctions(Map<String, Queue<String>> jar, 
List<FunctionHolder> n
       final String functionName = function.getName();
       Queue<String> jarFunctions = jar.get(functionName);
       if (jarFunctions == null) {
-        jarFunctions = Queues.newConcurrentLinkedQueue();
+        jarFunctions = new ConcurrentLinkedQueue<>();
         jar.put(functionName, jarFunctions);
       }
       final String functionSignature = function.getSignature();
       jarFunctions.add(functionSignature);
 
-      Map<String, DrillFuncHolder> signatures = functions.get(functionName);
-      if (signatures == null) {
-        signatures = Maps.newConcurrentMap();
-        functions.put(functionName, signatures);
-      }
+      Map<String, DrillFuncHolder> signatures = 
functions.computeIfAbsent(functionName, k -> new ConcurrentHashMap<>());
       signatures.put(functionSignature, function.getHolder());
     }
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 3740a6cc85d..cefbd8cf388 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -24,8 +24,9 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -77,13 +78,16 @@
   private final FunctionRegistryHolder registryHolder;
 
   /**
-   * Registers all functions present in Drill classpath on start-up. All 
functions will be marked as built-in.
-   * Built-in functions are not allowed to be unregistered. Initially sync 
registry version will be set to 0.
+   * Registers all functions present in Drill classpath on start-up.
+   * All functions will be marked as built-in. Built-in functions are not 
allowed to be unregistered.
+   * Since local function registry version is based on remote function 
registry version,
+   * initially sync version will be set to {@link DataChangeVersion#UNDEFINED}
+   * to ensure that upon first check both registries would be synchronized.
    */
   public LocalFunctionRegistry(ScanResult classpathScan) {
     registryHolder = new FunctionRegistryHolder();
     validate(BUILT_IN, classpathScan);
-    register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, 
this.getClass().getClassLoader())), 0);
+    register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, 
this.getClass().getClassLoader())), DataChangeVersion.UNDEFINED);
     if (logger.isTraceEnabled()) {
       StringBuilder allFunctions = new StringBuilder();
       for (DrillFuncHolder method: 
registryHolder.getAllFunctionsWithHolders().values()) {
@@ -96,7 +100,7 @@ public LocalFunctionRegistry(ScanResult classpathScan) {
   /**
    * @return remote function registry version number with which local function 
registry is synced
    */
-  public long getVersion() {
+  public int getVersion() {
     return registryHolder.getVersion();
   }
 
@@ -160,7 +164,7 @@ public long getVersion() {
    * @param jars list of jars to be registered
    * @param version remote function registry version number with which local 
function registry is synced
    */
-  public void register(List<JarScan> jars, long version) {
+  public void register(List<JarScan> jars, int version) {
     Map<String, List<FunctionHolder>> newJars = new HashMap<>();
     for (JarScan jarScan : jars) {
       FunctionConverter converter = new FunctionConverter();
@@ -219,7 +223,7 @@ public int size(){
    * @param name function name
    * @return all function holders associated with the function name. Function 
name is case insensitive.
    */
-  public List<DrillFuncHolder> getMethods(String name, AtomicLong version) {
+  public List<DrillFuncHolder> getMethods(String name, AtomicInteger version) {
     return registryHolder.getHoldersByFunctionName(name.toLowerCase(), 
version);
   }
 
@@ -238,7 +242,7 @@ public int size(){
    * @param operatorTable drill operator table
    */
   public void register(DrillOperatorTable operatorTable) {
-    AtomicLong versionHolder = new AtomicLong();
+    AtomicInteger versionHolder = new AtomicInteger();
     final Map<String, Collection<DrillFuncHolder>> registeredFunctions =
         registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
     operatorTable.setFunctionRegistryVersion(versionHolder.get());
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index 4e947656f3a..f727a9374eb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -54,34 +54,36 @@
  * Creates all remote registry areas at startup and validates them,
  * during init establishes connections with three udf related stores.
  * Provides tools to work with three udf related stores, gives access to 
remote registry areas.
- *
+ * <p/>
  * There are three udf stores:
- * REGISTRY - persistent store, stores remote function registry {@link 
Registry} under udf path
+ *
+ * <li><b>REGISTRY</b> - persistent store, stores remote function registry 
{@link Registry} under udf path
  * which contains information about all dynamically registered jars and their 
function signatures.
- * If connection is created for the first time, puts empty remote registry.
+ * If connection is created for the first time, puts empty remote 
registry.</li>
  *
- * UNREGISTRATION - transient store, stores information under udf/unregister 
path.
+ * <li><b>UNREGISTRATION</b> - transient store, stores information under 
udf/unregister path.
  * udf/unregister path is persistent by itself but any child created will be 
transient.
  * Whenever user submits request to unregister jar, child path with jar name 
is created under this store.
  * This store also holds unregistration listener, which notifies all drill 
bits when child path is created,
- * so they can start local unregistration process.
+ * so they can start local unregistration process.</li>
  *
- * JARS - transient store, stores information under udf/jars path.
+ * <li><b>JARS</b> - transient store, stores information under udf/jars path.
  * udf/jars path is persistent by itself but any child created will be 
transient.
  * Servers as lock, not allowing to perform any action on the same time.
  * There two types of actions: {@link Action#REGISTRATION} and {@link 
Action#UNREGISTRATION}.
  * Before starting any action, users tries to create child path with jar name 
under this store
  * and if such path already exists, receives action being performed on that 
very jar.
- * When user finishes its action, he deletes child path with jar name.
- *
+ * When user finishes its action, he deletes child path with jar name.</li>
+ * <p/>
  * There are three udf areas:
- * STAGING - area where user copies binary and source jars before starting 
registration process.
- * REGISTRY - area where registered jars are stored.
- * TMP - area where source and binary jars are backed up in unique folder 
during registration process.
+ *
+ * <li><b>STAGING</b> - area where user copies binary and source jars before 
starting registration process.</li>
+ * <li><b>REGISTRY</b> - area where registered jars are stored.</li>
+ * <li><b>TMP</b> - area where source and binary jars are backed up in unique 
folder during registration process.</li>
  */
 public class RemoteFunctionRegistry implements AutoCloseable {
 
-  private static final String registry_path = "registry";
+  private static final String REGISTRY_PATH = "registry";
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class);
   private static final ObjectMapper mapper = new 
ObjectMapper().enable(INDENT_OUTPUT);
 
@@ -112,19 +114,19 @@ public void init(DrillConfig config, 
PersistentStoreProvider storeProvider, Clus
    *
    * @return remote function registry version if any, -1 otherwise
    */
-  public long getRegistryVersion() {
+  public int getRegistryVersion() {
     DataChangeVersion version = new DataChangeVersion();
     boolean contains = false;
     try {
-      contains = registry.contains(registry_path, version);
+      contains = registry.contains(REGISTRY_PATH, version);
     } catch (Exception e) {
-      logger.error("Problem during trying to access remote function registry 
[{}]", registry_path, e);
+      logger.error("Problem during trying to access remote function registry 
[{}]", REGISTRY_PATH, e);
     }
     if (contains) {
       return version.getVersion();
     } else {
-      logger.error("Remote function registry [{}] is unreachable", 
registry_path);
-      return -1;
+      logger.error("Remote function registry [{}] is unreachable", 
REGISTRY_PATH);
+      return DataChangeVersion.NOT_AVAILABLE;
     }
   }
 
@@ -137,11 +139,11 @@ public long getRegistryVersion() {
   public boolean hasRegistry() { return registry != null; }
 
   public Registry getRegistry(DataChangeVersion version) {
-    return registry.get(registry_path, version);
+    return registry.get(REGISTRY_PATH, version);
   }
 
   public void updateRegistry(Registry registryContent, DataChangeVersion 
version) throws VersionMismatchException {
-    registry.put(registry_path, registryContent, version);
+    registry.put(REGISTRY_PATH, registryContent, version);
   }
 
   public void submitForUnregistration(String jar) {
@@ -193,7 +195,8 @@ private void prepareStores(PersistentStoreProvider 
storeProvider, ClusterCoordin
           .persist()
           .build();
       registry = storeProvider.getOrCreateVersionedStore(registrationConfig);
-      registry.putIfAbsent(registry_path, Registry.getDefaultInstance());
+      logger.trace("Remote function registry type: {}.", registry.getClass());
+      registry.putIfAbsent(REGISTRY_PATH, Registry.getDefaultInstance());
     } catch (StoreException e) {
       throw new DrillRuntimeException("Failure while loading remote 
registry.", e);
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index e1e33098aae..eb79a5a65af 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -54,7 +54,7 @@
   private final ArrayListMultimap<String, SqlOperator> 
drillOperatorsWithInferenceMap = ArrayListMultimap.create();
   // indicates remote function registry version based on which drill operator 
were loaded
   // is used to define if we need to reload operator table in case remote 
function registry version has changed
-  private long functionRegistryVersion;
+  private int functionRegistryVersion;
 
   private final OptionManager systemOptionManager;
 
@@ -70,14 +70,14 @@ public DrillOperatorTable(FunctionImplementationRegistry 
registry, OptionManager
    *
    * @param version registry version
    */
-  public void setFunctionRegistryVersion(long version) {
+  public void setFunctionRegistryVersion(int version) {
     functionRegistryVersion = version;
   }
 
   /**
    * @return function registry version based on which operator table was loaded
    */
-  public long getFunctionRegistryVersion() {
+  public int getFunctionRegistryVersion() {
     return functionRegistryVersion;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index e3cd7e460db..41faea96c13 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -21,7 +21,6 @@
 
 import org.apache.calcite.sql.SqlDescribeSchema;
 import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.drill.common.exceptions.UserException;
@@ -58,8 +57,7 @@ private DrillSqlWorker() {
    * @param sql sql query
    * @return query physical plan
    */
-  public static PhysicalPlan getPlan(QueryContext context, String sql) throws 
SqlParseException, ValidationException,
-      ForemanSetupException {
+  public static PhysicalPlan getPlan(QueryContext context, String sql) throws 
ForemanSetupException {
     return getPlan(context, sql, null);
   }
 
@@ -76,15 +74,18 @@ public static PhysicalPlan getPlan(QueryContext context, 
String sql) throws SqlP
    * @param textPlan text plan
    * @return query physical plan
    */
-  public static PhysicalPlan getPlan(QueryContext context, String sql, 
Pointer<String> textPlan)
-      throws ForemanSetupException {
+  public static PhysicalPlan getPlan(QueryContext context, String sql, 
Pointer<String> textPlan) throws ForemanSetupException {
     Pointer<String> textPlanCopy = textPlan == null ? null : new 
Pointer<>(textPlan.value);
     try {
       return getQueryPlan(context, sql, textPlan);
     } catch (Exception e) {
+      logger.trace("There was an error during conversion into physical plan. " 
+
+          "Will sync remote and local function registries if needed and retry 
" +
+          "in case if issue was due to missing function implementation.");
       if (context.getFunctionRegistry().syncWithRemoteRegistry(
           context.getDrillOperatorTable().getFunctionRegistryVersion())) {
         context.reloadDrillOperatorTable();
+        logger.trace("Local function registry was synchronized with remote. 
Trying to find function one more time.");
         return getQueryPlan(context, sql, textPlanCopy);
       }
       throw e;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
index 4311f48bd45..ca26a243326 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
@@ -18,24 +18,37 @@
 package org.apache.drill.exec.store.sys;
 
 import org.apache.drill.exec.exception.StoreException;
-import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
+import org.apache.drill.exec.store.sys.store.UndefinedVersionDelegatingStore;
 
 /**
  * A factory used to create {@link PersistentStore store} instances.
- *
  */
 public interface PersistentStoreProvider extends AutoCloseable {
+
   /**
    * Gets or creates a {@link PersistentStore persistent store} for the given 
configuration.
    *
    * Note that implementors have liberty to cache previous {@link 
PersistentStore store} instances.
    *
-   * @param config  store configuration
-   * @param <V>  store value type
+   * @param config store configuration
+   * @param <V> store value type
+   * @return persistent store instance
+   * @throws StoreException in case when unable to create store
    */
   <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) 
throws StoreException;
+
+  /**
+   * Override this method if store supports versioning and return versioning 
instance.
+   * By default, undefined version wrapper will be used.
+   *
+   * @param config store configuration
+   * @param <V> store value type
+   * @return versioned persistent store instance
+   * @throws StoreException in case when unable to create store
+   */
   default <V> VersionedPersistentStore<V> 
getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws 
StoreException {
-    return new VersionedDelegatingStore<>(getOrCreateStore(config));
+    // for those stores that do not support versioning
+    return new UndefinedVersionDelegatingStore<>(getOrCreateStore(config));
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
index d182de33174..76e5610def4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
@@ -17,9 +17,17 @@
  */
 package org.apache.drill.exec.store.sys.store;
 
+/**
+ * Holder for store version. By default version is {@link 
DataChangeVersion#UNDEFINED}.
+ */
 public class DataChangeVersion {
 
-  private int version;
+  // is used when store in unreachable
+  public static final int NOT_AVAILABLE = -1;
+  // is used when store does not support versioning
+  public static final int UNDEFINED = -2;
+
+  private int version = UNDEFINED;
 
   public void setVersion(int version) {
     this.version = version;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
new file mode 100644
index 00000000000..5873ec0ac92
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Wrapper store that delegates operations to PersistentStore.
+ * Does not keep versioning and returns {@link DataChangeVersion#UNDEFINED} 
when version is required.
+ *
+ * @param <V> store value type
+ */
+public class UndefinedVersionDelegatingStore<V> implements 
VersionedPersistentStore<V> {
+
+  private final PersistentStore<V> store;
+
+  public UndefinedVersionDelegatingStore(PersistentStore<V> store) {
+    this.store = store;
+  }
+
+  @Override
+  public boolean contains(String key, DataChangeVersion version) {
+    version.setVersion(DataChangeVersion.UNDEFINED);
+    return store.contains(key);
+  }
+
+  @Override
+  public V get(String key, DataChangeVersion version) {
+    version.setVersion(DataChangeVersion.UNDEFINED);
+    return store.get(key);
+  }
+
+  @Override
+  public void put(String key, V value, DataChangeVersion version) {
+    store.put(key, value);
+  }
+
+  @Override
+  public PersistentStoreMode getMode() {
+    return store.getMode();
+  }
+
+  @Override
+  public void delete(String key) {
+    store.delete(key);
+  }
+
+  @Override
+  public boolean putIfAbsent(String key, V value) {
+    return store.putIfAbsent(key, value);
+  }
+
+  @Override
+  public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
+    return store.getRange(skip, take);
+  }
+
+  @Override
+  public void close() throws Exception {
+    store.close();
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
index 18e0b826291..40576d55c71 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
@@ -30,22 +30,24 @@
 import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 
 /**
- * Versioned Store that delegates operations to PersistentStore
- * @param <V>
+ * Versioned store that delegates operations to PersistentStore and keeps 
versioning,
+ * incrementing version each time write / delete operation is triggered.
+ * Once created initial version is 0. Can be used only for local versioning, 
not distributed.
+ *
+ * @param <V> store value type
  */
 public class VersionedDelegatingStore<V> implements 
VersionedPersistentStore<V> {
   private final PersistentStore<V> store;
-  private final ReadWriteLock readWriteLock;
   private final AutoCloseableLock readLock;
   private final AutoCloseableLock writeLock;
   private int version;
 
   public VersionedDelegatingStore(PersistentStore<V> store) {
     this.store = store;
-    readWriteLock = new ReentrantReadWriteLock();
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     readLock = new AutoCloseableLock(readWriteLock.readLock());
     writeLock = new AutoCloseableLock(readWriteLock.writeLock());
-    version = -1;
+    version = 0;
   }
 
   @Override
@@ -113,7 +115,7 @@ public void close() throws Exception
   {
     try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
       store.close();
-      version = -1;
+      version = DataChangeVersion.NOT_AVAILABLE;
     }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
index aa6ee9d1717..75cef2f472e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
@@ -17,21 +17,23 @@
  */
 package org.apache.drill.exec.store.sys.store.provider;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.common.util.function.CheckedFunction;
 
 public class CachingPersistentStoreProvider extends 
BasePersistentStoreProvider {
-//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CachingPersistentStoreProvider.class);
 
-  private final ConcurrentMap<PersistentStoreConfig<?>, PersistentStore<?>> 
storeCache = Maps.newConcurrentMap();
+  private final Map<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = 
new ConcurrentHashMap<>();
+  private final Map<PersistentStoreConfig<?>, VersionedPersistentStore<?>> 
versionedStoreCache = new ConcurrentHashMap<>();
   private final PersistentStoreProvider provider;
 
   public CachingPersistentStoreProvider(PersistentStoreProvider provider) {
@@ -41,21 +43,15 @@ public 
CachingPersistentStoreProvider(PersistentStoreProvider provider) {
   @Override
   @SuppressWarnings("unchecked")
   public <V> PersistentStore<V> getOrCreateStore(final 
PersistentStoreConfig<V> config) throws StoreException {
-    final PersistentStore<?> store = storeCache.get(config);
-    if (store == null) {
-      final PersistentStore<?> newStore = provider.getOrCreateStore(config);
-      final PersistentStore<?> finalStore = storeCache.putIfAbsent(config, 
newStore);
-      if (finalStore == null) {
-        return (PersistentStore<V>)newStore;
-      }
-      try {
-        newStore.close();
-      } catch (Exception ex) {
-        throw new StoreException(ex);
-      }
-    }
+    CheckedFunction<PersistentStoreConfig<?>, PersistentStore<?>, 
StoreException> function = provider::getOrCreateStore;
+    return (PersistentStore<V>) storeCache.computeIfAbsent(config, function);
+  }
 
-    return (PersistentStore<V>) store;
+  @Override
+  @SuppressWarnings("unchecked")
+  public <V> VersionedPersistentStore<V> 
getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws 
StoreException {
+    CheckedFunction<PersistentStoreConfig<?>, VersionedPersistentStore<?>, 
StoreException> function = provider::getOrCreateVersionedStore;
+    return (VersionedPersistentStore<V>) 
versionedStoreCache.computeIfAbsent(config, function);
   }
 
   @Override
@@ -65,12 +61,19 @@ public void start() throws Exception {
 
   @Override
   public void close() throws Exception {
-    final List<AutoCloseable> closeables = Lists.newArrayList();
-    for (final AutoCloseable store : storeCache.values()) {
-      closeables.add(store);
-    }
-    closeables.add(provider);
+    List<AutoCloseable> closeables = new ArrayList<>();
+
+    // add un-versioned stores
+    closeables.addAll(storeCache.values());
     storeCache.clear();
+
+    // add versioned stores
+    closeables.addAll(versionedStoreCache.values());
+    versionedStoreCache.clear();
+
+    // add provider
+    closeables.add(provider);
+
     AutoCloseables.close(closeables);
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
index 3ab85ec291e..6a70df775dc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
@@ -17,11 +17,12 @@
  */
 package org.apache.drill.exec.store.sys.store.provider;
 
-import org.apache.drill.exec.exception.StoreException;
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 import org.apache.drill.exec.store.sys.store.InMemoryStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
 
 public class InMemoryStoreProvider implements PersistentStoreProvider {
 
@@ -35,10 +36,15 @@ public InMemoryStoreProvider(int capacity) {
   public void close() throws Exception { }
 
   @Override
-  public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> 
config) throws StoreException {
+  public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> 
config) {
     return new InMemoryStore<>(capacity);
   }
 
   @Override
-  public void start() throws Exception { }
+  public <V> VersionedPersistentStore<V> 
getOrCreateVersionedStore(PersistentStoreConfig<V> config) {
+    return new VersionedDelegatingStore<>(getOrCreateStore(config));
+  }
+
+  @Override
+  public void start() { }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
index af6777111b8..2dae62def52 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
@@ -26,7 +26,9 @@
 import org.apache.drill.exec.store.sys.PersistentStore;
 import org.apache.drill.exec.store.sys.PersistentStoreConfig;
 import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
 import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
 import org.apache.drill.exec.testing.store.NoWriteLocalStore;
 import org.apache.hadoop.fs.Path;
 
@@ -70,6 +72,10 @@ public LocalPersistentStoreProvider(final DrillConfig 
config) throws StoreExcept
     }
   }
 
+  @Override
+  public <V> VersionedPersistentStore<V> 
getOrCreateVersionedStore(PersistentStoreConfig<V> config) {
+    return new VersionedDelegatingStore<>(getOrCreateStore(config));
+  }
 
   @Override
   public void close() throws Exception {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
index 8f5252d1e25..978231575ae 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
@@ -20,7 +20,6 @@
 import 
org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.categories.SqlFunctionTest;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
 import org.junit.Before;
@@ -30,9 +29,11 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -51,7 +52,7 @@
 
   @BeforeClass
   public static void init() {
-    newJars = Maps.newHashMap();
+    newJars = new HashMap<>();
     FunctionHolder lower = new FunctionHolder("lower", 
"lower(VARCHAR-REQUIRED)", mock(DrillFuncHolder.class));
     FunctionHolder upper = new FunctionHolder("upper", 
"upper(VARCHAR-REQUIRED)", mock(DrillFuncHolder.class));
     newJars.put(built_in, Lists.newArrayList(lower, upper));
@@ -69,9 +70,9 @@ public void setup() {
   @Test
   public void testVersion() {
     resetRegistry();
-    long expectedVersion = 0;
+    int expectedVersion = 0;
     assertEquals("Initial version should be 0", expectedVersion, 
registryHolder.getVersion());
-    registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap(), 
++expectedVersion);
+    registryHolder.addJars(new HashMap<>(), ++expectedVersion);
     assertEquals("Version can change if no jars were added.", expectedVersion, 
registryHolder.getVersion());
     fillInRegistry(++expectedVersion);
     assertEquals("Version should have incremented by 1", expectedVersion, 
registryHolder.getVersion());
@@ -87,7 +88,7 @@ public void testVersion() {
   public void testAddJars() {
     resetRegistry();
     int functionsSize = 0;
-    List<String> jars = Lists.newArrayList();
+    List<String> jars = new ArrayList<>();
     ListMultimap<String, DrillFuncHolder> functionsWithHolders = 
ArrayListMultimap.create();
     ListMultimap<String, String> functionsWithSignatures = 
ArrayListMultimap.create();
     for (Map.Entry<String, List<FunctionHolder>> jar : newJars.entrySet()) {
@@ -99,7 +100,7 @@ public void testAddJars() {
       }
     }
 
-    long expectedVersion = 0;
+    int expectedVersion = 0;
     registryHolder.addJars(newJars, ++expectedVersion);
     assertEquals("Version number should match", expectedVersion, 
registryHolder.getVersion());
     compareTwoLists(jars, registryHolder.getAllJarNames());
@@ -112,7 +113,7 @@ public void testAddJars() {
   public void testAddTheSameJars() {
     resetRegistry();
     int functionsSize = 0;
-    List<String> jars = Lists.newArrayList();
+    List<String> jars = new ArrayList<>();
     ListMultimap<String, DrillFuncHolder> functionsWithHolders = 
ArrayListMultimap.create();
     ListMultimap<String, String> functionsWithSignatures = 
ArrayListMultimap.create();
     for (Map.Entry<String, List<FunctionHolder>> jar : newJars.entrySet()) {
@@ -123,7 +124,7 @@ public void testAddTheSameJars() {
         functionsSize++;
       }
     }
-    long expectedVersion = 0;
+    int expectedVersion = 0;
     registryHolder.addJars(newJars, ++expectedVersion);
     assertEquals("Version number should match", expectedVersion, 
registryHolder.getVersion());
     compareTwoLists(jars, registryHolder.getAllJarNames());
@@ -150,16 +151,15 @@ public void testRemoveJar() {
 
   @Test
   public void testGetAllJarNames() {
-    ArrayList<String> expectedResult = Lists.newArrayList(newJars.keySet());
+    List<String> expectedResult = new ArrayList<>(newJars.keySet());
     compareTwoLists(expectedResult, registryHolder.getAllJarNames());
   }
 
   @Test
   public void testGetFunctionNamesByJar() {
-    ArrayList<String> expectedResult = Lists.newArrayList();
-    for (FunctionHolder functionHolder : newJars.get(built_in)) {
-      expectedResult.add(functionHolder.getName());
-    }
+    List<String> expectedResult = newJars.get(built_in).stream()
+        .map(FunctionHolder::getName)
+        .collect(Collectors.toList());
     compareTwoLists(expectedResult, 
registryHolder.getFunctionNamesByJar(built_in));
   }
 
@@ -171,7 +171,7 @@ public void testGetAllFunctionsWithHoldersWithVersion() {
         expectedResult.put(functionHolder.getName(), 
functionHolder.getHolder());
       }
     }
-    AtomicLong version = new AtomicLong();
+    AtomicInteger version = new AtomicInteger();
     compareListMultimaps(expectedResult, 
registryHolder.getAllFunctionsWithHolders(version));
     assertEquals("Version number should match", version.get(), 
registryHolder.getVersion());
   }
@@ -200,30 +200,26 @@ public void testGetAllFunctionsWithSignatures() {
 
   @Test
   public void testGetHoldersByFunctionNameWithVersion() {
-    List<DrillFuncHolder> expectedResult = Lists.newArrayList();
-    for (List<FunctionHolder> functionHolders : newJars.values()) {
-      for (FunctionHolder functionHolder : functionHolders) {
-        if ("lower".equals(functionHolder.getName())) {
-          expectedResult.add(functionHolder.getHolder());
-        }
-      }
-    }
+    List<DrillFuncHolder> expectedResult = newJars.values().stream()
+        .flatMap(Collection::stream)
+        .filter(f -> "lower".equals(f.getName()))
+        .map(FunctionHolder::getHolder)
+        .collect(Collectors.toList());
+
     assertFalse(expectedResult.isEmpty());
-    AtomicLong version = new AtomicLong();
+    AtomicInteger version = new AtomicInteger();
     compareTwoLists(expectedResult, 
registryHolder.getHoldersByFunctionName("lower", version));
     assertEquals("Version number should match", version.get(), 
registryHolder.getVersion());
   }
 
   @Test
   public void testGetHoldersByFunctionName() {
-    List<DrillFuncHolder> expectedResult = Lists.newArrayList();
-    for (List<FunctionHolder> functionHolders : newJars.values()) {
-      for (FunctionHolder functionHolder : functionHolders) {
-        if ("lower".equals(functionHolder.getName())) {
-          expectedResult.add(functionHolder.getHolder());
-        }
-      }
-    }
+    List<DrillFuncHolder> expectedResult = newJars.values().stream()
+        .flatMap(Collection::stream)
+        .filter(f -> "lower".equals(f.getName()))
+        .map(FunctionHolder::getHolder)
+        .collect(Collectors.toList());
+
     assertFalse(expectedResult.isEmpty());
     compareTwoLists(expectedResult, 
registryHolder.getHoldersByFunctionName("lower"));
   }
@@ -236,10 +232,9 @@ public void testContainsJar() {
 
   @Test
   public void testFunctionsSize() {
-    int count = 0;
-    for (List<FunctionHolder> functionHolders : newJars.values()) {
-      count += functionHolders.size();
-    }
+    int count = newJars.values().stream()
+        .mapToInt(List::size)
+        .sum();
     assertEquals("Functions size should match", count, 
registryHolder.functionsSize());
   }
 
@@ -256,7 +251,7 @@ private void resetRegistry() {
     registryHolder = new FunctionRegistryHolder();
   }
 
-  private void fillInRegistry(long version) {
+  private void fillInRegistry(int version) {
     registryHolder.addJars(newJars, version);
   }
 
@@ -266,7 +261,7 @@ private void fillInRegistry(long version) {
     assertEquals("Multimaps size should match", m1.size(), m2.size());
     for (Map.Entry<String, Collection<T>> entry : m1.entrySet()) {
       try {
-        compareTwoLists(Lists.newArrayList(entry.getValue()), 
Lists.newArrayList(m2.get(entry.getKey())));
+        compareTwoLists(new ArrayList<>(entry.getValue()), new 
ArrayList<>(m2.get(entry.getKey())));
       } catch (AssertionError e) {
         throw new AssertionError("Multimaps values should match", e);
       }
@@ -275,9 +270,7 @@ private void fillInRegistry(long version) {
 
   private <T> void compareTwoLists(List<T> l1, List<T> l2) {
     assertEquals("Lists size should match", l1.size(), l2.size());
-    for (T item : l1) {
-      assertTrue("Two lists should have the same values", l2.contains(item));
-    }
+    l1.forEach(i -> assertTrue("Two lists should have the same values", 
l2.contains(i)));
   }
 
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
index a5a3c5121aa..16ac42e27e9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
@@ -64,7 +64,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -762,7 +762,7 @@ public void 
testConcurrentRemoteRegistryUpdateWithDuplicates() throws Exception
 
     DataChangeVersion version = new DataChangeVersion();
     Registry registry = remoteFunctionRegistry.getRegistry(version);
-    assertEquals("Remote registry version should match", 1, 
version.getVersion());
+    assertEquals("Remote registry version should match", 2, 
version.getVersion());
     List<Jar> jarList = registry.getJarList();
     assertEquals("Only one jar should be registered", 1, jarList.size());
     assertEquals("Jar name should match", jar1, jarList.get(0).getName());
@@ -823,7 +823,7 @@ public void 
testConcurrentRemoteRegistryUpdateForDifferentJars() throws Exceptio
 
     DataChangeVersion version = new DataChangeVersion();
     Registry registry = remoteFunctionRegistry.getRegistry(version);
-    assertEquals("Remote registry version should match", 2, 
version.getVersion());
+    assertEquals("Remote registry version should match", 3, 
version.getVersion());
 
     List<Jar> actualJars = registry.getJarList();
     List<String> expectedJars = Lists.newArrayList(jar1, jar2);
@@ -861,7 +861,7 @@ public void testLazyInitConcurrent() throws Exception {
           assertTrue("syncWithRemoteRegistry() should return true", result);
           return true;
         })
-        
.when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong());
+        .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt());
 
     SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query);
     Thread thread1 = new Thread(simpleQueryRunner);
@@ -873,10 +873,10 @@ public void testLazyInitConcurrent() throws Exception {
     thread1.join();
     thread2.join();
 
-    verify(functionImplementationRegistry, 
times(2)).syncWithRemoteRegistry(anyLong());
+    verify(functionImplementationRegistry, 
times(2)).syncWithRemoteRegistry(anyInt());
     LocalFunctionRegistry localFunctionRegistry = 
(LocalFunctionRegistry)FieldUtils.readField(
         functionImplementationRegistry, "localFunctionRegistry", true);
-    assertEquals("Sync function registry version should match", 1L, 
localFunctionRegistry.getVersion());
+    assertEquals("Sync function registry version should match", 2, 
localFunctionRegistry.getVersion());
   }
 
   @Test
@@ -895,7 +895,7 @@ public void testLazyInitNoReload() throws Exception {
           assertFalse("syncWithRemoteRegistry() should return false", result);
           return false;
         })
-        
.when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong());
+        .when(functionImplementationRegistry).syncWithRemoteRegistry(anyInt());
 
     test("select custom_lower('A') from (values(1))");
 
@@ -906,10 +906,10 @@ public void testLazyInitNoReload() throws Exception {
       assertThat(e.getMessage(), containsString("No match found for function 
signature unknown_lower(<CHARACTER>)"));
     }
 
-    verify(functionImplementationRegistry, 
times(2)).syncWithRemoteRegistry(anyLong());
+    verify(functionImplementationRegistry, 
times(2)).syncWithRemoteRegistry(anyInt());
     LocalFunctionRegistry localFunctionRegistry = 
(LocalFunctionRegistry)FieldUtils.readField(
         functionImplementationRegistry, "localFunctionRegistry", true);
-    assertEquals("Sync function registry version should match", 1L, 
localFunctionRegistry.getVersion());
+    assertEquals("Sync function registry version should match", 2, 
localFunctionRegistry.getVersion());
   }
 
   private static String buildJars(String jarName, String includeFiles, String 
includeResources) {
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
index 1709bdf501a..c71a2d46168 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PrintingUtils.java
@@ -19,7 +19,7 @@
 
 import ch.qos.logback.classic.Level;
 import org.apache.drill.exec.client.LoggingResultsListener;
-import org.apache.drill.exec.util.CheckedSupplier;
+import org.apache.drill.common.util.function.CheckedSupplier;
 import org.apache.drill.exec.util.VectorUtil;
 
 import java.util.function.Supplier;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Dynamic UDFs registered on one Drillbit are not visible on other Drillbits
> --------------------------------------------------------------------------
>
>                 Key: DRILL-6762
>                 URL: https://issues.apache.org/jira/browse/DRILL-6762
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Functions - Drill
>    Affects Versions: 1.13.0
>            Reporter: Kunal Khatua
>            Assignee: Arina Ielchiieva
>            Priority: Critical
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>         Attachments: Dynamic UDFs issue.pdf
>
>
> Originally Reported : 
> https://stackoverflow.com/questions/52480160/dynamic-udf-in-apache-drill-cluster
> When using a 4-node Drill 1.14 cluster, UDF jars registered on one node are 
> not usable on other nodes despite the {{/registry}} and ZK showing the UDFs 
> as registered.
> This was previously working on 1.13.0
> *Root cause*
> 1. {{VersionedDelegatingStore}} was starting with version -1 and local 
> function registry with version 0. This caused issues when 
> {{LocalPersistentStore}} already existed on the file system. When adding jars 
> into remote registry its versioned was bumped to 0 and synchronization did 
> not happen since local registry had the same version.
> *Fix*: start {{VersionedDelegatingStore}} with version 0, local function 
> registry with undefined version (-2) thus first sync will always happen.
> 2. {{PersistentStoreProvider.getOrCreateVersionedStore}} was wrapping stores 
> into VersionedDelegatingStore for those store providers that did not override 
> this method. Only Zookeeper store was overriding it. But 
> {{VersionedDelegatingStore}} is only keeps versioning locally and thus can be 
> applied only for local stores, i.e. Hbase, Mongo cannot use it.
> {{CachingPersistentStoreProvider}} did not override 
> {{getOrCreateVersionedStore}} either. Mostly all stores in Drill are created 
> using {{CachingPersistentStoreProvider}}. Thus all stores where wrapped into 
> {{VersionedDelegatingStore}}, even Zookeeper one which caused function 
> registries version synchronization issues.
> *Fix*: Add {{UndefinedVersionDelegatingStore}} for those stores that do not 
> support versioning and wrap into it by default in 
> {{PersistentStoreProvider.getOrCreateVersionedStore}} if this method is not 
> overriden. {{UndefinedVersionDelegatingStore}}  will return UNDEFINED version 
> (-2). During sync between remote and local registries if remote registry has 
> UNDEFINED version sync will be done immediately, on the contrary with 
> NOT_AVAILABLE version (-1) which indicates that remote function registry is 
> not accessible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to