This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c2e14ff411e [FLINK-32512][table] Don't register resource to user 
resource manager when creating temporary function
c2e14ff411e is described below

commit c2e14ff411e806f9ccf176c85eb8249b8ff12e56
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Tue Jul 4 19:00:06 2023 +0800

    [FLINK-32512][table] Don't register resource to user resource manager when 
creating temporary function
---
 .../apache/flink/util/ChildFirstClassLoader.java   |   6 +
 .../flink/util/FlinkUserCodeClassLoader.java       |   2 +-
 .../flink/util/FlinkUserCodeClassLoaders.java      |  11 ++
 .../apache/flink/util/MutableURLClassLoader.java   |   8 ++
 .../flink/util/FlinkUserCodeClassLoaderTest.java   |   5 +
 .../HadoopPathBasedBulkFormatBuilderTest.java      |   7 +
 .../src/test/resources/sql/function.q              |   8 ++
 .../flink/table/catalog/FunctionCatalog.java       |  51 +++++--
 .../flink/table/resource/ResourceManager.java      | 149 ++++++++++++++++++---
 .../flink/table/resource/ResourceManagerTest.java  | 116 ++++++++++++++++
 .../planner/runtime/stream/sql/FunctionITCase.java |   7 +-
 11 files changed, 341 insertions(+), 29 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java 
b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
index 727e7731815..8f10330ee6e 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
@@ -127,4 +127,10 @@ public final class ChildFirstClassLoader extends 
FlinkUserCodeClassLoader {
     static {
         ClassLoader.registerAsParallelCapable();
     }
+
+    @Override
+    public MutableURLClassLoader copy() {
+        return new ChildFirstClassLoader(
+                getURLs(), getParent(), alwaysParentFirstPatterns, 
classLoadingExceptionHandler);
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
index 66c826cea6e..cd8eb01016d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java
@@ -32,7 +32,7 @@ public abstract class FlinkUserCodeClassLoader extends 
MutableURLClassLoader {
         ClassLoader.registerAsParallelCapable();
     }
 
-    private final Consumer<Throwable> classLoadingExceptionHandler;
+    protected final Consumer<Throwable> classLoadingExceptionHandler;
 
     protected FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
         this(urls, parent, NOOP_EXCEPTION_HANDLER);
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
index 791b707a7ad..08f31518efa 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -139,6 +139,11 @@ public class FlinkUserCodeClassLoaders {
         static {
             ClassLoader.registerAsParallelCapable();
         }
+
+        @Override
+        public MutableURLClassLoader copy() {
+            return new ParentFirstClassLoader(getURLs(), getParent(), 
classLoadingExceptionHandler);
+        }
     }
 
     /**
@@ -203,6 +208,12 @@ public class FlinkUserCodeClassLoaders {
             ensureInner().addURL(url);
         }
 
+        @Override
+        public MutableURLClassLoader copy() {
+            return new SafetyNetWrapperClassLoader(
+                    (FlinkUserCodeClassLoader) inner.copy(), getParent());
+        }
+
         @Override
         public URL getResource(String name) {
             return ensureInner().getResource(name);
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java 
b/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
index 42bf55f131f..62ad58f7315 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java
@@ -39,4 +39,12 @@ public abstract class MutableURLClassLoader extends 
URLClassLoader {
     public void addURL(URL url) {
         super.addURL(url);
     }
+
+    /**
+     * Copy the classloader for each job and these jobs can add their jar 
files to the classloader
+     * independently.
+     *
+     * @return the copied classloader
+     */
+    public abstract MutableURLClassLoader copy();
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
 
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
index 401945c110a..6c89f6d9703 100644
--- 
a/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java
@@ -58,5 +58,10 @@ public class FlinkUserCodeClassLoaderTest extends TestLogger 
{
         protected Class<?> loadClassWithoutExceptionHandling(String name, 
boolean resolve) {
             throw expectedException;
         }
+
+        @Override
+        public MutableURLClassLoader copy() {
+            return new ThrowingURLClassLoader(classLoadingExceptionHandler, 
expectedException);
+        }
     }
 }
diff --git 
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
index 161a37280e5..3dffbbe9f9c 100644
--- 
a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
+++ 
b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter;
 import 
org.apache.flink.formats.hadoop.bulk.TestHadoopPathBasedBulkWriterFactory;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
 import org.apache.flink.util.FlinkUserCodeClassLoader;
+import org.apache.flink.util.MutableURLClassLoader;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -97,5 +98,11 @@ public class HadoopPathBasedBulkFormatBuilderTest {
                 return super.loadClassWithoutExceptionHandling(name, resolve);
             }
         }
+
+        @Override
+        public MutableURLClassLoader copy() {
+            return new SpecifiedChildFirstUserClassLoader(
+                    specifiedClassName, getParent(), getURLs());
+        }
     }
 }
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q 
b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index c554f6e6352..320fbefc898 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -294,6 +294,14 @@ SHOW JARS;
 Empty set
 !ok
 
+create temporary function temp_upperudf AS 'UpperUDF' using jar 
'$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeed.
+!info
+
+SHOW JARS;
+Empty set
+!ok
+
 create function upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH';
 [INFO] Execute statement succeed.
 !info
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 5ba7f831102..8eb8f82351f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -47,8 +47,13 @@ import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URLClassLoader;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -135,10 +140,17 @@ public final class FunctionCatalog {
                             "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
                             name));
         }
+        unregisterFunctionJarResources(function);
 
         return function != null;
     }
 
+    private void unregisterFunctionJarResources(@Nullable CatalogFunction 
function) {
+        if (function != null && function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {
+            
resourceManager.unregisterFunctionResources(function.getFunctionResources());
+        }
+    }
+
     /** Registers a temporary catalog function. */
     public void registerTemporaryCatalogFunction(
             UnresolvedIdentifier unresolvedIdentifier,
@@ -523,6 +535,7 @@ public final class FunctionCatalog {
                     .getTemporaryOperationListener(normalizedName)
                     .ifPresent(l -> 
l.onDropTemporaryFunction(normalizedName.toObjectPath()));
             tempCatalogFunctions.remove(normalizedName);
+            unregisterFunctionJarResources(fd);
         } else if (!ignoreIfNotExist) {
             throw new ValidationException(
                     String.format("Temporary catalog function %s doesn't 
exist", identifier));
@@ -614,6 +627,8 @@ public final class FunctionCatalog {
         CatalogFunction potentialResult = 
tempCatalogFunctions.get(normalizedIdentifier);
 
         if (potentialResult != null) {
+            registerFunctionJarResources(
+                    oi.asSummaryString(), 
potentialResult.getFunctionResources());
             return Optional.of(
                     ContextResolvedFunction.temporary(
                             FunctionIdentifier.of(oi),
@@ -664,11 +679,12 @@ public final class FunctionCatalog {
 
         String normalizedName = FunctionIdentifier.normalizeName(funcName);
         if (tempSystemFunctions.containsKey(normalizedName)) {
+            CatalogFunction function = tempSystemFunctions.get(normalizedName);
+            registerFunctionJarResources(funcName, 
function.getFunctionResources());
             return Optional.of(
                     ContextResolvedFunction.temporary(
                             FunctionIdentifier.of(funcName),
-                            getFunctionDefinition(
-                                    normalizedName, 
tempSystemFunctions.get(normalizedName))));
+                            getFunctionDefinition(normalizedName, function)));
         }
 
         Optional<FunctionDefinition> candidate =
@@ -685,7 +701,7 @@ public final class FunctionCatalog {
 
     @SuppressWarnings("unchecked")
     private void validateAndPrepareFunction(String name, CatalogFunction 
function)
-            throws ClassNotFoundException {
+            throws ClassNotFoundException, IOException {
         // If the input is instance of UserDefinedFunction, it means it uses 
the new type inference.
         // In this situation the UDF have not been validated and cleaned, so 
we need to validate it
         // and clean its closure here.
@@ -701,13 +717,30 @@ public final class FunctionCatalog {
         } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
             // If the jar resource of UDF used is not empty, register it to 
classloader before
             // validate.
-            registerFunctionJarResources(name, 
function.getFunctionResources());
+            List<ResourceUri> resourceUris = function.getFunctionResources();
+            try {
+                if (!resourceUris.isEmpty()) {
+                    resourceManager.declareFunctionResources(
+                            new HashSet<>(function.getFunctionResources()));
+                }
+            } catch (Exception e) {
+                throw new TableException(
+                        String.format(
+                                "Failed to register function jar resource '%s' 
of function '%s'.",
+                                resourceUris, name),
+                        e);
+            }
 
-            UserDefinedFunctionHelper.validateClass(
-                    (Class<? extends UserDefinedFunction>)
-                            resourceManager
-                                    .getUserClassLoader()
-                                    .loadClass(function.getClassName()));
+            URLClassLoader classLoader = 
resourceManager.createUserClassLoader(resourceUris);
+            try {
+                UserDefinedFunctionHelper.validateClass(
+                        (Class<? extends UserDefinedFunction>)
+                                
classLoader.loadClass(function.getClassName()));
+            } finally {
+                if (!resourceUris.isEmpty()) {
+                    classLoader.close();
+                }
+            }
         }
     }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index 2cde34e11f6..5de2b5d6ef4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -45,8 +45,10 @@ import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +57,9 @@ import java.util.UUID;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** A manager for dealing with all user defined resource. */
 @Internal
 public class ResourceManager implements Closeable {
@@ -65,6 +70,9 @@ public class ResourceManager implements Closeable {
     private static final String FILE_SCHEME = "file";
 
     private final Path localResourceDir;
+    /** Resource infos for functions. */
+    private final Map<ResourceUri, ResourceCounter> functionResourceInfos;
+
     protected final Map<ResourceUri, URL> resourceInfos;
     protected final MutableURLClassLoader userClassLoader;
 
@@ -80,6 +88,7 @@ public class ResourceManager implements Closeable {
                 new Path(
                         config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR),
                         String.format("flink-table-%s", UUID.randomUUID()));
+        this.functionResourceInfos = new HashMap<>();
         this.resourceInfos = new HashMap<>();
         this.userClassLoader = userClassLoader;
     }
@@ -103,7 +112,8 @@ public class ResourceManager implements Closeable {
                                         String.format("Failed to register jar 
resource [%s]", url),
                                         e);
                             }
-                        }),
+                        },
+                        false),
                 true);
     }
 
@@ -124,15 +134,69 @@ public class ResourceManager implements Closeable {
                         Collections.singletonList(resourceUri),
                         ResourceType.FILE,
                         false,
-                        url -> {});
+                        url -> {},
+                        false);
         registerResources(stagingResources, false);
         return resourceInfos.get(new 
ArrayList<>(stagingResources.keySet()).get(0)).getPath();
     }
 
+    /**
+     * Declare a resource for function and add it to the function resource 
infos. If the file is
+     * remote, it will be copied to a local file. The declared resource will 
not be added to
+     * resources and classloader if it is not used in the job.
+     *
+     * @param resourceUris the resource uri for function.
+     */
+    public void declareFunctionResources(Set<ResourceUri> resourceUris) throws 
IOException {
+        prepareStagingResources(
+                resourceUris,
+                ResourceType.JAR,
+                true,
+                url -> {
+                    try {
+                        JarUtils.checkJarFile(url);
+                    } catch (IOException e) {
+                        throw new ValidationException(
+                                String.format("Failed to register jar resource 
[%s]", url), e);
+                    }
+                },
+                true);
+    }
+
+    /**
+     * Unregister the resource uri in function resources, when the reference 
count of the resource
+     * is 0, the resource will be removed from the function resources.
+     *
+     * @param resourceUris the uris to unregister in function resources.
+     */
+    public void unregisterFunctionResources(List<ResourceUri> resourceUris) {
+        if (!resourceUris.isEmpty()) {
+            resourceUris.forEach(
+                    uri -> {
+                        ResourceCounter counter = 
functionResourceInfos.get(uri);
+                        if (counter != null && counter.decreaseCounter()) {
+                            functionResourceInfos.remove(uri);
+                        }
+                    });
+        }
+    }
+
     public URLClassLoader getUserClassLoader() {
         return userClassLoader;
     }
 
+    public URLClassLoader createUserClassLoader(List<ResourceUri> 
resourceUris) {
+        if (resourceUris.isEmpty()) {
+            return userClassLoader;
+        }
+        MutableURLClassLoader classLoader = userClassLoader.copy();
+        for (ResourceUri resourceUri : new HashSet<>(resourceUris)) {
+            
classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url);
+        }
+
+        return classLoader;
+    }
+
     public Map<ResourceUri, URL> getResources() {
         return Collections.unmodifiableMap(resourceInfos);
     }
@@ -171,6 +235,7 @@ public class ResourceManager implements Closeable {
     @Override
     public void close() throws IOException {
         resourceInfos.clear();
+        functionResourceInfos.clear();
 
         IOException exception = null;
         try {
@@ -309,6 +374,11 @@ public class ResourceManager implements Closeable {
         }
     }
 
+    @VisibleForTesting
+    Map<ResourceUri, ResourceCounter> functionResourceInfos() {
+        return functionResourceInfos;
+    }
+
     private Path getResourceLocalPath(Path remotePath) {
         String fileName = remotePath.getName();
         String fileExtension = Files.getFileExtension(fileName);
@@ -327,7 +397,7 @@ public class ResourceManager implements Closeable {
         return new Path(localResourceDir, fileNameWithUUID);
     }
 
-    private void checkResources(List<ResourceUri> resourceUris, ResourceType 
expectedType)
+    private void checkResources(Collection<ResourceUri> resourceUris, 
ResourceType expectedType)
             throws IOException {
         // check the resource type
         if (resourceUris.stream()
@@ -360,10 +430,11 @@ public class ResourceManager implements Closeable {
     }
 
     private Map<ResourceUri, URL> prepareStagingResources(
-            List<ResourceUri> resourceUris,
+            Collection<ResourceUri> resourceUris,
             ResourceType expectedType,
             boolean executable,
-            Consumer<URL> resourceChecker)
+            Consumer<URL> resourceChecker,
+            boolean declareFunctionResource)
             throws IOException {
         checkResources(resourceUris, expectedType);
 
@@ -381,24 +452,41 @@ public class ResourceManager implements Closeable {
                 }
             }
 
-            // here can check whether the resource path is valid
-            Path path = new Path(resourceUri.getUri());
             URL localUrl;
-            // download resource to a local path firstly if in remote
-            if (isRemotePath(path)) {
-                localUrl = downloadResource(path, executable);
+            ResourceUri localResourceUri = resourceUri;
+            if (expectedType == ResourceType.JAR
+                    && functionResourceInfos.containsKey(resourceUri)) {
+                // Get local url from function resource infos.
+                localUrl = functionResourceInfos.get(resourceUri).url;
+                // Register resource uri to increase the reference counter
+                functionResourceInfos
+                        .computeIfAbsent(resourceUri, key -> new 
ResourceCounter(localUrl))
+                        .increaseCounter();
             } else {
-                localUrl = getURLFromPath(path);
-                // if the local resource is a relative path, here convert it 
to an absolute path
-                // before register
-                resourceUri = new ResourceUri(expectedType, 
localUrl.getPath());
-            }
+                // here can check whether the resource path is valid
+                Path path = new Path(resourceUri.getUri());
+                // download resource to a local path firstly if in remote
+                if (isRemotePath(path)) {
+                    localUrl = downloadResource(path, executable);
+                } else {
+                    localUrl = getURLFromPath(path);
+                    // if the local resource is a relative path, here convert 
it to an absolute path
+                    // before register
+                    localResourceUri = new ResourceUri(expectedType, 
localUrl.getPath());
+                }
 
-            // check the local file
-            resourceChecker.accept(localUrl);
+                // check the local file
+                resourceChecker.accept(localUrl);
+
+                if (declareFunctionResource) {
+                    functionResourceInfos
+                            .computeIfAbsent(resourceUri, key -> new 
ResourceCounter(localUrl))
+                            .increaseCounter();
+                }
+            }
 
             // add it to a staging map
-            stagingResourceLocalURLs.put(resourceUri, localUrl);
+            stagingResourceLocalURLs.put(localResourceUri, localUrl);
         }
         return stagingResourceLocalURLs;
     }
@@ -419,4 +507,29 @@ public class ResourceManager implements Closeable {
                     LOG.info("Register resource [{}] successfully.", 
resourceUri.getUri());
                 });
     }
+
+    /**
+     * Resource with reference counter, when the counter is 0, it means the 
resource can be removed.
+     */
+    static class ResourceCounter {
+        final URL url;
+        int counter;
+
+        private ResourceCounter(URL url) {
+            this.url = url;
+            this.counter = 0;
+        }
+
+        private void increaseCounter() {
+            this.counter++;
+        }
+
+        private boolean decreaseCounter() {
+            this.counter--;
+            checkState(
+                    this.counter >= 0,
+                    String.format("Invalid reference count[%d] which must >= 
0", this.counter));
+            return this.counter == 0;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index bc18ce9bfd1..42305d806af 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -24,6 +24,13 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
@@ -47,9 +54,13 @@ import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG;
+import static 
org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE;
 import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
 import static 
org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -60,6 +71,14 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 /** Tests for {@link ResourceManager}. */
 public class ResourceManagerTest {
+    private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER1 =
+            UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
"test_udf1");
+
+    private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER2 =
+            UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
"test_udf2");
+
+    private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER3 =
+            UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, 
"test_udf3");
 
     @TempDir private static File tempFolder;
     private static File udfJar;
@@ -253,6 +272,38 @@ public class ResourceManagerTest {
                 });
     }
 
+    @Test
+    public void testRegisterFunctionResource() throws Exception {
+        URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
+
+        // test class loading before register function resource
+        CommonTestUtils.assertThrows(
+                "LowerUDF",
+                ClassNotFoundException.class,
+                () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, 
userClassLoader));
+
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
+        // register the same jar repeatedly
+        resourceManager.declareFunctionResources(
+                new HashSet<>(Arrays.asList(resourceUri, resourceUri)));
+
+        // test class loading after register function resource
+        CommonTestUtils.assertThrows(
+                "LowerUDF",
+                ClassNotFoundException.class,
+                () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, 
userClassLoader));
+
+        URLClassLoader functionClassLoader =
+                
resourceManager.createUserClassLoader(Arrays.asList(resourceUri, resourceUri));
+        // test load class
+        final Class<?> clazz1 =
+                Class.forName(GENERATED_LOWER_UDF_CLASS, false, 
functionClassLoader);
+        final Class<?> clazz2 =
+                Class.forName(GENERATED_LOWER_UDF_CLASS, false, 
functionClassLoader);
+
+        assertEquals(clazz1, clazz2);
+    }
+
     @MethodSource("provideResource")
     @ParameterizedTest
     public void testDownloadResource(String pathString, boolean executable) 
throws Exception {
@@ -315,6 +366,71 @@ public class ResourceManagerTest {
         assertThat(FileUtils.readFileUtf8(new File(targetUri))).isEqualTo("Bye 
Bye");
     }
 
+    @Test
+    void testRegisterFunctionWithResource() {
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
+        List<ResourceUri> resourceUris = 
Collections.singletonList(resourceUri);
+
+        Configuration configuration = new Configuration();
+        FunctionCatalog functionCatalog =
+                new FunctionCatalog(
+                        configuration,
+                        resourceManager,
+                        CatalogManagerMocks.preparedCatalogManager()
+                                .defaultCatalog(
+                                        DEFAULT_CATALOG,
+                                        new GenericInMemoryCatalog(
+                                                DEFAULT_CATALOG, 
DEFAULT_DATABASE))
+                                .build(),
+                        new ModuleManager());
+
+        functionCatalog.registerCatalogFunction(
+                FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, 
resourceUris, false);
+
+        Map<ResourceUri, ResourceManager.ResourceCounter> 
functionResourceInfos =
+                resourceManager.functionResourceInfos();
+        // Register catalog function will not register its resource to 
function resources.
+        assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse();
+        functionCatalog.dropCatalogFunction(FULL_UNRESOLVED_IDENTIFIER1, 
false);
+
+        // Register catalog function again to validate that unregister catalog 
function will not
+        // decrease the reference count of resourceUris.
+        functionCatalog.registerCatalogFunction(
+                FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, 
resourceUris, false);
+        functionCatalog.registerTemporaryCatalogFunction(
+                FULL_UNRESOLVED_IDENTIFIER2,
+                new CatalogFunctionImpl(
+                        GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA, 
resourceUris),
+                false);
+        functionCatalog.registerTemporaryCatalogFunction(
+                FULL_UNRESOLVED_IDENTIFIER3,
+                new CatalogFunctionImpl(
+                        GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA, 
resourceUris),
+                false);
+        functionCatalog.registerTemporarySystemFunction(
+                GENERATED_LOWER_UDF_CLASS,
+                new CatalogFunctionImpl(
+                        GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA, 
resourceUris),
+                false);
+
+        // There will be three resources for temporary and system functions 
without catalog
+        // function.
+        
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(3);
+        // Drop catalog function will not decrease the reference count of 
resourceUris.
+        functionCatalog.dropCatalogFunction(FULL_UNRESOLVED_IDENTIFIER1, 
false);
+        // There will be three resources for temporary and system functions.
+        
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(3);
+
+        
functionCatalog.dropTemporaryCatalogFunction(FULL_UNRESOLVED_IDENTIFIER2, 
false);
+        
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(2);
+
+        
functionCatalog.dropTemporaryCatalogFunction(FULL_UNRESOLVED_IDENTIFIER3, 
false);
+        
assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(1);
+
+        functionCatalog.dropTemporarySystemFunction(GENERATED_LOWER_UDF_CLASS, 
false);
+        assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse();
+    }
+
     @Test
     public void testCloseResourceManagerCleanDownloadedResources() throws 
Exception {
         resourceManager.close();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 0daa554d6e6..e94d12d7ee9 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -52,6 +52,7 @@ import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.table.types.logical.RawType;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -229,7 +230,7 @@ public class FunctionITCase extends StreamingTestBase {
     }
 
     @Test
-    public void testCreateTemporarySystemFunctionByUsingJar() {
+    public void testCreateTemporarySystemFunctionByUsingJar() throws Exception 
{
         String ddl =
                 String.format(
                         "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING 
JAR '%s'",
@@ -237,6 +238,10 @@ public class FunctionITCase extends StreamingTestBase {
         tEnv().executeSql(ddl);
         assertThat(Arrays.asList(tEnv().listFunctions())).contains("f10");
 
+        try (CloseableIterator<Row> itor = tEnv().executeSql("SHOW 
JARS").collect()) {
+            assertThat(itor.hasNext()).isFalse();
+        }
+
         tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
         
assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f10");
     }

Reply via email to