This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c2e14ff411e [FLINK-32512][table] Don't register resource to user resource manager when creating temporary function c2e14ff411e is described below commit c2e14ff411e806f9ccf176c85eb8249b8ff12e56 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Tue Jul 4 19:00:06 2023 +0800 [FLINK-32512][table] Don't register resource to user resource manager when creating temporary function --- .../apache/flink/util/ChildFirstClassLoader.java | 6 + .../flink/util/FlinkUserCodeClassLoader.java | 2 +- .../flink/util/FlinkUserCodeClassLoaders.java | 11 ++ .../apache/flink/util/MutableURLClassLoader.java | 8 ++ .../flink/util/FlinkUserCodeClassLoaderTest.java | 5 + .../HadoopPathBasedBulkFormatBuilderTest.java | 7 + .../src/test/resources/sql/function.q | 8 ++ .../flink/table/catalog/FunctionCatalog.java | 51 +++++-- .../flink/table/resource/ResourceManager.java | 149 ++++++++++++++++++--- .../flink/table/resource/ResourceManagerTest.java | 116 ++++++++++++++++ .../planner/runtime/stream/sql/FunctionITCase.java | 7 +- 11 files changed, 341 insertions(+), 29 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java index 727e7731815..8f10330ee6e 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java +++ b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java @@ -127,4 +127,10 @@ public final class ChildFirstClassLoader extends FlinkUserCodeClassLoader { static { ClassLoader.registerAsParallelCapable(); } + + @Override + public MutableURLClassLoader copy() { + return new ChildFirstClassLoader( + getURLs(), getParent(), alwaysParentFirstPatterns, classLoadingExceptionHandler); + } } diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java index 66c826cea6e..cd8eb01016d 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java +++ b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoader.java @@ -32,7 +32,7 @@ public abstract class FlinkUserCodeClassLoader extends MutableURLClassLoader { ClassLoader.registerAsParallelCapable(); } - private final Consumer<Throwable> classLoadingExceptionHandler; + protected final Consumer<Throwable> classLoadingExceptionHandler; protected FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) { this(urls, parent, NOOP_EXCEPTION_HANDLER); diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java index 791b707a7ad..08f31518efa 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java +++ b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java @@ -139,6 +139,11 @@ public class FlinkUserCodeClassLoaders { static { ClassLoader.registerAsParallelCapable(); } + + @Override + public MutableURLClassLoader copy() { + return new ParentFirstClassLoader(getURLs(), getParent(), classLoadingExceptionHandler); + } } /** @@ -203,6 +208,12 @@ public class FlinkUserCodeClassLoaders { ensureInner().addURL(url); } + @Override + public MutableURLClassLoader copy() { + return new SafetyNetWrapperClassLoader( + (FlinkUserCodeClassLoader) inner.copy(), getParent()); + } + @Override public URL getResource(String name) { return ensureInner().getResource(name); diff --git a/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java b/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java index 42bf55f131f..62ad58f7315 100644 --- a/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java +++ b/flink-core/src/main/java/org/apache/flink/util/MutableURLClassLoader.java @@ -39,4 +39,12 @@ public abstract class MutableURLClassLoader extends URLClassLoader { public void addURL(URL url) { super.addURL(url); } + + /** + * Copy the classloader for each job and these jobs can add their jar files to the classloader + * independently. + * + * @return the copied classloader + */ + public abstract MutableURLClassLoader copy(); } diff --git a/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java index 401945c110a..6c89f6d9703 100644 --- a/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/FlinkUserCodeClassLoaderTest.java @@ -58,5 +58,10 @@ public class FlinkUserCodeClassLoaderTest extends TestLogger { protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve) { throw expectedException; } + + @Override + public MutableURLClassLoader copy() { + return new ThrowingURLClassLoader(classLoadingExceptionHandler, expectedException); + } } } diff --git a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java index 161a37280e5..3dffbbe9f9c 100644 --- a/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java +++ b/flink-formats/flink-hadoop-bulk/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilderTest.java @@ -22,6 +22,7 @@ import org.apache.flink.formats.hadoop.bulk.HadoopPathBasedBulkWriter; import org.apache.flink.formats.hadoop.bulk.TestHadoopPathBasedBulkWriterFactory; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.util.FlinkUserCodeClassLoader; +import org.apache.flink.util.MutableURLClassLoader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -97,5 +98,11 @@ public class HadoopPathBasedBulkFormatBuilderTest { return super.loadClassWithoutExceptionHandling(name, resolve); } } + + @Override + public MutableURLClassLoader copy() { + return new SpecifiedChildFirstUserClassLoader( + specifiedClassName, getParent(), getURLs()); + } } } diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q index c554f6e6352..320fbefc898 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/function.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q @@ -294,6 +294,14 @@ SHOW JARS; Empty set !ok +create temporary function temp_upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH'; +[INFO] Execute statement succeed. +!info + +SHOW JARS; +Empty set +!ok + create function upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH'; [INFO] Execute statement succeed. !info diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java index 5ba7f831102..8eb8f82351f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java @@ -47,8 +47,13 @@ import org.apache.flink.table.resource.ResourceUri; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.net.URLClassLoader; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -135,10 +140,17 @@ public final class FunctionCatalog { "Could not drop temporary system function. A function named '%s' doesn't exist.", name)); } + unregisterFunctionJarResources(function); return function != null; } + private void unregisterFunctionJarResources(@Nullable CatalogFunction function) { + if (function != null && function.getFunctionLanguage() == FunctionLanguage.JAVA) { + resourceManager.unregisterFunctionResources(function.getFunctionResources()); + } + } + /** Registers a temporary catalog function. */ public void registerTemporaryCatalogFunction( UnresolvedIdentifier unresolvedIdentifier, @@ -523,6 +535,7 @@ public final class FunctionCatalog { .getTemporaryOperationListener(normalizedName) .ifPresent(l -> l.onDropTemporaryFunction(normalizedName.toObjectPath())); tempCatalogFunctions.remove(normalizedName); + unregisterFunctionJarResources(fd); } else if (!ignoreIfNotExist) { throw new ValidationException( String.format("Temporary catalog function %s doesn't exist", identifier)); @@ -614,6 +627,8 @@ public final class FunctionCatalog { CatalogFunction potentialResult = tempCatalogFunctions.get(normalizedIdentifier); if (potentialResult != null) { + registerFunctionJarResources( + oi.asSummaryString(), potentialResult.getFunctionResources()); return Optional.of( ContextResolvedFunction.temporary( FunctionIdentifier.of(oi), @@ -664,11 +679,12 @@ public final class FunctionCatalog { String normalizedName = FunctionIdentifier.normalizeName(funcName); if (tempSystemFunctions.containsKey(normalizedName)) { + CatalogFunction function = tempSystemFunctions.get(normalizedName); + registerFunctionJarResources(funcName, function.getFunctionResources()); return Optional.of( ContextResolvedFunction.temporary( FunctionIdentifier.of(funcName), - getFunctionDefinition( - normalizedName, tempSystemFunctions.get(normalizedName)))); + getFunctionDefinition(normalizedName, function))); } Optional<FunctionDefinition> candidate = @@ -685,7 +701,7 @@ public final class FunctionCatalog { @SuppressWarnings("unchecked") private void validateAndPrepareFunction(String name, CatalogFunction function) - throws ClassNotFoundException { + throws ClassNotFoundException, IOException { // If the input is instance of UserDefinedFunction, it means it uses the new type inference. // In this situation the UDF have not been validated and cleaned, so we need to validate it // and clean its closure here. @@ -701,13 +717,30 @@ public final class FunctionCatalog { } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) { // If the jar resource of UDF used is not empty, register it to classloader before // validate. - registerFunctionJarResources(name, function.getFunctionResources()); + List<ResourceUri> resourceUris = function.getFunctionResources(); + try { + if (!resourceUris.isEmpty()) { + resourceManager.declareFunctionResources( + new HashSet<>(function.getFunctionResources())); + } + } catch (Exception e) { + throw new TableException( + String.format( + "Failed to register function jar resource '%s' of function '%s'.", + resourceUris, name), + e); + } - UserDefinedFunctionHelper.validateClass( - (Class<? extends UserDefinedFunction>) - resourceManager - .getUserClassLoader() - .loadClass(function.getClassName())); + URLClassLoader classLoader = resourceManager.createUserClassLoader(resourceUris); + try { + UserDefinedFunctionHelper.validateClass( + (Class<? extends UserDefinedFunction>) + classLoader.loadClass(function.getClassName())); + } finally { + if (!resourceUris.isEmpty()) { + classLoader.close(); + } + } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java index 2cde34e11f6..5de2b5d6ef4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java @@ -45,8 +45,10 @@ import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -55,6 +57,9 @@ import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** A manager for dealing with all user defined resource. */ @Internal public class ResourceManager implements Closeable { @@ -65,6 +70,9 @@ public class ResourceManager implements Closeable { private static final String FILE_SCHEME = "file"; private final Path localResourceDir; + /** Resource infos for functions. */ + private final Map<ResourceUri, ResourceCounter> functionResourceInfos; + protected final Map<ResourceUri, URL> resourceInfos; protected final MutableURLClassLoader userClassLoader; @@ -80,6 +88,7 @@ public class ResourceManager implements Closeable { new Path( config.get(TableConfigOptions.RESOURCES_DOWNLOAD_DIR), String.format("flink-table-%s", UUID.randomUUID())); + this.functionResourceInfos = new HashMap<>(); this.resourceInfos = new HashMap<>(); this.userClassLoader = userClassLoader; } @@ -103,7 +112,8 @@ public class ResourceManager implements Closeable { String.format("Failed to register jar resource [%s]", url), e); } - }), + }, + false), true); } @@ -124,15 +134,69 @@ public class ResourceManager implements Closeable { Collections.singletonList(resourceUri), ResourceType.FILE, false, - url -> {}); + url -> {}, + false); registerResources(stagingResources, false); return resourceInfos.get(new ArrayList<>(stagingResources.keySet()).get(0)).getPath(); } + /** + * Declare a resource for function and add it to the function resource infos. If the file is + * remote, it will be copied to a local file. The declared resource will not be added to + * resources and classloader if it is not used in the job. + * + * @param resourceUris the resource uri for function. + */ + public void declareFunctionResources(Set<ResourceUri> resourceUris) throws IOException { + prepareStagingResources( + resourceUris, + ResourceType.JAR, + true, + url -> { + try { + JarUtils.checkJarFile(url); + } catch (IOException e) { + throw new ValidationException( + String.format("Failed to register jar resource [%s]", url), e); + } + }, + true); + } + + /** + * Unregister the resource uri in function resources, when the reference count of the resource + * is 0, the resource will be removed from the function resources. + * + * @param resourceUris the uris to unregister in function resources. + */ + public void unregisterFunctionResources(List<ResourceUri> resourceUris) { + if (!resourceUris.isEmpty()) { + resourceUris.forEach( + uri -> { + ResourceCounter counter = functionResourceInfos.get(uri); + if (counter != null && counter.decreaseCounter()) { + functionResourceInfos.remove(uri); + } + }); + } + } + public URLClassLoader getUserClassLoader() { return userClassLoader; } + public URLClassLoader createUserClassLoader(List<ResourceUri> resourceUris) { + if (resourceUris.isEmpty()) { + return userClassLoader; + } + MutableURLClassLoader classLoader = userClassLoader.copy(); + for (ResourceUri resourceUri : new HashSet<>(resourceUris)) { + classLoader.addURL(checkNotNull(functionResourceInfos.get(resourceUri)).url); + } + + return classLoader; + } + public Map<ResourceUri, URL> getResources() { return Collections.unmodifiableMap(resourceInfos); } @@ -171,6 +235,7 @@ public class ResourceManager implements Closeable { @Override public void close() throws IOException { resourceInfos.clear(); + functionResourceInfos.clear(); IOException exception = null; try { @@ -309,6 +374,11 @@ public class ResourceManager implements Closeable { } } + @VisibleForTesting + Map<ResourceUri, ResourceCounter> functionResourceInfos() { + return functionResourceInfos; + } + private Path getResourceLocalPath(Path remotePath) { String fileName = remotePath.getName(); String fileExtension = Files.getFileExtension(fileName); @@ -327,7 +397,7 @@ public class ResourceManager implements Closeable { return new Path(localResourceDir, fileNameWithUUID); } - private void checkResources(List<ResourceUri> resourceUris, ResourceType expectedType) + private void checkResources(Collection<ResourceUri> resourceUris, ResourceType expectedType) throws IOException { // check the resource type if (resourceUris.stream() @@ -360,10 +430,11 @@ public class ResourceManager implements Closeable { } private Map<ResourceUri, URL> prepareStagingResources( - List<ResourceUri> resourceUris, + Collection<ResourceUri> resourceUris, ResourceType expectedType, boolean executable, - Consumer<URL> resourceChecker) + Consumer<URL> resourceChecker, + boolean declareFunctionResource) throws IOException { checkResources(resourceUris, expectedType); @@ -381,24 +452,41 @@ public class ResourceManager implements Closeable { } } - // here can check whether the resource path is valid - Path path = new Path(resourceUri.getUri()); URL localUrl; - // download resource to a local path firstly if in remote - if (isRemotePath(path)) { - localUrl = downloadResource(path, executable); + ResourceUri localResourceUri = resourceUri; + if (expectedType == ResourceType.JAR + && functionResourceInfos.containsKey(resourceUri)) { + // Get local url from function resource infos. + localUrl = functionResourceInfos.get(resourceUri).url; + // Register resource uri to increase the reference counter + functionResourceInfos + .computeIfAbsent(resourceUri, key -> new ResourceCounter(localUrl)) + .increaseCounter(); } else { - localUrl = getURLFromPath(path); - // if the local resource is a relative path, here convert it to an absolute path - // before register - resourceUri = new ResourceUri(expectedType, localUrl.getPath()); - } + // here can check whether the resource path is valid + Path path = new Path(resourceUri.getUri()); + // download resource to a local path firstly if in remote + if (isRemotePath(path)) { + localUrl = downloadResource(path, executable); + } else { + localUrl = getURLFromPath(path); + // if the local resource is a relative path, here convert it to an absolute path + // before register + localResourceUri = new ResourceUri(expectedType, localUrl.getPath()); + } - // check the local file - resourceChecker.accept(localUrl); + // check the local file + resourceChecker.accept(localUrl); + + if (declareFunctionResource) { + functionResourceInfos + .computeIfAbsent(resourceUri, key -> new ResourceCounter(localUrl)) + .increaseCounter(); + } + } // add it to a staging map - stagingResourceLocalURLs.put(resourceUri, localUrl); + stagingResourceLocalURLs.put(localResourceUri, localUrl); } return stagingResourceLocalURLs; } @@ -419,4 +507,29 @@ public class ResourceManager implements Closeable { LOG.info("Register resource [{}] successfully.", resourceUri.getUri()); }); } + + /** + * Resource with reference counter, when the counter is 0, it means the resource can be removed. + */ + static class ResourceCounter { + final URL url; + int counter; + + private ResourceCounter(URL url) { + this.url = url; + this.counter = 0; + } + + private void increaseCounter() { + this.counter++; + } + + private boolean decreaseCounter() { + this.counter--; + checkState( + this.counter >= 0, + String.format("Invalid reference count[%d] which must >= 0", this.counter)); + return this.counter == 0; + } + } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java index bc18ce9bfd1..42305d806af 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java @@ -24,6 +24,13 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.util.FileUtils; import org.apache.flink.util.UserClassLoaderJarTestUtils; @@ -47,9 +54,13 @@ import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.stream.Stream; +import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_CATALOG; +import static org.apache.flink.table.utils.CatalogManagerMocks.DEFAULT_DATABASE; import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS; import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE; import static org.assertj.core.api.Assertions.assertThat; @@ -60,6 +71,14 @@ import static org.junit.jupiter.api.Assertions.fail; /** Tests for {@link ResourceManager}. */ public class ResourceManagerTest { + private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER1 = + UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, "test_udf1"); + + private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER2 = + UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, "test_udf2"); + + private static final UnresolvedIdentifier FULL_UNRESOLVED_IDENTIFIER3 = + UnresolvedIdentifier.of(DEFAULT_CATALOG, DEFAULT_DATABASE, "test_udf3"); @TempDir private static File tempFolder; private static File udfJar; @@ -253,6 +272,38 @@ public class ResourceManagerTest { }); } + @Test + public void testRegisterFunctionResource() throws Exception { + URLClassLoader userClassLoader = resourceManager.getUserClassLoader(); + + // test class loading before register function resource + CommonTestUtils.assertThrows( + "LowerUDF", + ClassNotFoundException.class, + () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader)); + + ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, udfJar.getPath()); + // register the same jar repeatedly + resourceManager.declareFunctionResources( + new HashSet<>(Arrays.asList(resourceUri, resourceUri))); + + // test class loading after register function resource + CommonTestUtils.assertThrows( + "LowerUDF", + ClassNotFoundException.class, + () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader)); + + URLClassLoader functionClassLoader = + resourceManager.createUserClassLoader(Arrays.asList(resourceUri, resourceUri)); + // test load class + final Class<?> clazz1 = + Class.forName(GENERATED_LOWER_UDF_CLASS, false, functionClassLoader); + final Class<?> clazz2 = + Class.forName(GENERATED_LOWER_UDF_CLASS, false, functionClassLoader); + + assertEquals(clazz1, clazz2); + } + @MethodSource("provideResource") @ParameterizedTest public void testDownloadResource(String pathString, boolean executable) throws Exception { @@ -315,6 +366,71 @@ public class ResourceManagerTest { assertThat(FileUtils.readFileUtf8(new File(targetUri))).isEqualTo("Bye Bye"); } + @Test + void testRegisterFunctionWithResource() { + ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, udfJar.getPath()); + List<ResourceUri> resourceUris = Collections.singletonList(resourceUri); + + Configuration configuration = new Configuration(); + FunctionCatalog functionCatalog = + new FunctionCatalog( + configuration, + resourceManager, + CatalogManagerMocks.preparedCatalogManager() + .defaultCatalog( + DEFAULT_CATALOG, + new GenericInMemoryCatalog( + DEFAULT_CATALOG, DEFAULT_DATABASE)) + .build(), + new ModuleManager()); + + functionCatalog.registerCatalogFunction( + FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, resourceUris, false); + + Map<ResourceUri, ResourceManager.ResourceCounter> functionResourceInfos = + resourceManager.functionResourceInfos(); + // Register catalog function will not register its resource to function resources. + assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse(); + functionCatalog.dropCatalogFunction(FULL_UNRESOLVED_IDENTIFIER1, false); + + // Register catalog function again to validate that unregister catalog function will not + // decrease the reference count of resourceUris. + functionCatalog.registerCatalogFunction( + FULL_UNRESOLVED_IDENTIFIER1, GENERATED_LOWER_UDF_CLASS, resourceUris, false); + functionCatalog.registerTemporaryCatalogFunction( + FULL_UNRESOLVED_IDENTIFIER2, + new CatalogFunctionImpl( + GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA, resourceUris), + false); + functionCatalog.registerTemporaryCatalogFunction( + FULL_UNRESOLVED_IDENTIFIER3, + new CatalogFunctionImpl( + GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA, resourceUris), + false); + functionCatalog.registerTemporarySystemFunction( + GENERATED_LOWER_UDF_CLASS, + new CatalogFunctionImpl( + GENERATED_LOWER_UDF_CLASS, FunctionLanguage.JAVA, resourceUris), + false); + + // There will be three resources for temporary and system functions without catalog + // function. + assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(3); + // Drop catalog function will not decrease the reference count of resourceUris. + functionCatalog.dropCatalogFunction(FULL_UNRESOLVED_IDENTIFIER1, false); + // There will be three resources for temporary and system functions. + assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(3); + + functionCatalog.dropTemporaryCatalogFunction(FULL_UNRESOLVED_IDENTIFIER2, false); + assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(2); + + functionCatalog.dropTemporaryCatalogFunction(FULL_UNRESOLVED_IDENTIFIER3, false); + assertThat(functionResourceInfos.get(resourceUri).counter).isEqualTo(1); + + functionCatalog.dropTemporarySystemFunction(GENERATED_LOWER_UDF_CLASS, false); + assertThat(functionResourceInfos.containsKey(resourceUri)).isFalse(); + } + @Test public void testCloseResourceManagerCleanDownloadedResources() throws Exception { resourceManager.close(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java index 0daa554d6e6..e94d12d7ee9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java @@ -52,6 +52,7 @@ import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.table.types.inference.TypeStrategies; import org.apache.flink.table.types.logical.RawType; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -229,7 +230,7 @@ public class FunctionITCase extends StreamingTestBase { } @Test - public void testCreateTemporarySystemFunctionByUsingJar() { + public void testCreateTemporarySystemFunctionByUsingJar() throws Exception { String ddl = String.format( "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING JAR '%s'", @@ -237,6 +238,10 @@ public class FunctionITCase extends StreamingTestBase { tEnv().executeSql(ddl); assertThat(Arrays.asList(tEnv().listFunctions())).contains("f10"); + try (CloseableIterator<Row> itor = tEnv().executeSql("SHOW JARS").collect()) { + assertThat(itor.hasNext()).isFalse(); + } + tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10"); assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f10"); }