This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ce477c31562 [FLINK-39081] Add ARTIFACT keyword option in CREATE 
FUNCTION's USING clause
ce477c31562 is described below

commit ce477c31562dcd732e73359e81141b8628fab11c
Author: Mika Naylor <[email protected]>
AuthorDate: Fri Feb 13 14:28:50 2026 +0100

    [FLINK-39081] Add ARTIFACT keyword option in CREATE FUNCTION's USING clause
---
 .../sql/hive-compatibility/hive-dialect/create.md  |   6 +-
 docs/content.zh/docs/sql/reference/ddl/create.md   |   2 +-
 .../sql/hive-compatibility/hive-dialect/create.md  |   6 +-
 docs/content/docs/sql/reference/ddl/create.md      |   8 +-
 .../table/client/gateway/SingleSessionManager.java |   5 +-
 .../client/resource/ClientResourceManager.java     |  27 +++--
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/includes/parserImpls.ftl      |  12 ++-
 .../sql/parser/ddl/resource/SqlResourceType.java   |   3 +-
 .../flink/sql/parser/utils/ParserResource.java     |   5 +-
 .../ParserResource.properties                      |   2 +-
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 117 ++++++++++++++-------
 .../flink/table/catalog/FunctionCatalog.java       |  21 ++--
 .../table/operations/command/AddJarOperation.java  |   2 +-
 .../flink/table/resource/ResourceManager.java      |  76 +++++++++----
 .../flink/table/resource/ResourceManagerTest.java  |  53 ++++++----
 .../apache/flink/table/resource/ResourceType.java  |   3 +-
 .../planner/utils/OperationConverterUtils.java     |   3 +
 .../batch/sql/PartitionableSourceITCase.scala      |   2 +-
 19 files changed, 240 insertions(+), 115 deletions(-)

diff --git a/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md 
b/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
index ae357260f70..ef930f6687b 100644
--- a/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
+++ b/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
@@ -212,7 +212,7 @@ CREATE TEMPORARY MACRO simple_add (x int, y int) x + y;
 #### Create Temporary Function
 
 ```sql
-CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR 'file_uri'];
+CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|ARTIFACT 
'file_uri'];
 ```
 
 The function exists for the duration of the current session.
@@ -221,12 +221,12 @@ The function exists for the duration of the current 
session.
 
 ```sql
 CREATE FUNCTION [db_name.]function_name AS class_name
-  [USING JAR 'file_uri'];
+  [USING [JAR|ARTIFACT] 'file_uri'];
 ```
 The function is registered to metastore and will exist in all session unless 
the function is dropped.
 
 ### Parameter
-- `[USING JAR 'file_uri']`
+- `[USING [JAR|ARTIFACT] 'file_uri']`
 
   User can use the clause to add Jar that contains the implementation of the 
function along with its dependencies while creating the function.
   The `file_uri` can be on local file or distributed file system.
diff --git a/docs/content.zh/docs/sql/reference/ddl/create.md 
b/docs/content.zh/docs/sql/reference/ddl/create.md
index e5314fec71e..cffd9d689a6 100644
--- a/docs/content.zh/docs/sql/reference/ddl/create.md
+++ b/docs/content.zh/docs/sql/reference/ddl/create.md
@@ -876,7 +876,7 @@ CREATE [TEMPORARY] VIEW [IF NOT EXISTS] 
[catalog_name.][db_name.]view_name
 CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
   [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
   AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
-  [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
+  [USING [JAR|ARTIFACT] '<path_to_filename>.jar' [, JAR 
'<path_to_filename>.jar']* ]
   [WITH (key1=val1, key2=val2, ...)]
 ```
 
diff --git a/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md 
b/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
index 356b4dd98fe..a2bc7a7a4d3 100644
--- a/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
+++ b/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
@@ -212,7 +212,7 @@ CREATE TEMPORARY MACRO simple_add (x int, y int) x + y;
 #### Create Temporary Function
 
 ```sql
-CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR 'file_uri'];
+CREATE TEMPORARY FUNCTION function_name AS class_name [USING [JAR|ARTIFACT] 
'file_uri'];
 ```
 
 The function exists for the duration of the current session.
@@ -221,12 +221,12 @@ The function exists for the duration of the current 
session.
 
 ```sql
 CREATE FUNCTION [db_name.]function_name AS class_name
-  [USING JAR 'file_uri'];
+  [USING [JAR|ARTIFACT] 'file_uri'];
 ```
 The function is registered to metastore and will exist in all session unless 
the function is dropped.
 
 ### Parameter
-- `[USING JAR 'file_uri']`
+- `[USING [JAR|ARTIFACT] 'file_uri']`
 
   User can use the clause to add Jar that contains the implementation of the 
function along with its dependencies while creating the function.
   The `file_uri` can be on local file or distributed file system.
diff --git a/docs/content/docs/sql/reference/ddl/create.md 
b/docs/content/docs/sql/reference/ddl/create.md
index fff1f392b78..b72fa7a2dea 100644
--- a/docs/content/docs/sql/reference/ddl/create.md
+++ b/docs/content/docs/sql/reference/ddl/create.md
@@ -857,10 +857,10 @@ If the view already exists, nothing happens.
 
 ## CREATE FUNCTION
 ```sql
-CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
-  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
-  AS identifier [LANGUAGE JAVA|SCALA|PYTHON] 
-  [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
+CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
+  [IF NOT EXISTS] [catalog_name.][db_name.]function_name
+  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
+  [USING [JAR|ARTIFACT] '<path_to_filename>.jar' [, JAR 
'<path_to_filename>.jar']* ]
   [WITH (key1=val1, key2=val2, ...)]
 ```
 
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
index 99dd4e72ed2..2c1cb101209 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
@@ -188,7 +188,7 @@ public class SingleSessionManager implements SessionManager 
{
             ClientResourceManager resourceManager =
                     new ClientResourceManager(configuration, userClassLoader);
             try {
-                resourceManager.registerJarResources(
+                resourceManager.registerResources(
                         dependencies.stream()
                                 .map(uri -> new ResourceUri(ResourceType.JAR, 
uri.toString()))
                                 .collect(Collectors.toList()));
@@ -221,7 +221,8 @@ public class SingleSessionManager implements SessionManager 
{
         protected ResultFetcher callRemoveJar(OperationHandle operationHandle, 
String jarPath) {
             URL jarURL =
                     ((ClientResourceManager) 
sessionContext.getSessionState().resourceManager)
-                            .unregisterJarResource(jarPath);
+                            .unregisterResource(
+                                    jarPath, List.of(ResourceType.JAR, 
ResourceType.ARTIFACT));
             if (jarURL != null) {
                 ((ClientWrapperClassLoader)
                                 sessionContext
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
index 13e64b6f499..df9aa45e39a 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
@@ -31,6 +31,9 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * The {@link ClientResourceManager} is able to remove the registered JAR 
resources with the
@@ -48,15 +51,27 @@ public class ClientResourceManager extends ResourceManager {
     }
 
     @Nullable
-    public URL unregisterJarResource(String jarPath) {
-        Path path = new Path(jarPath);
+    public URL unregisterResource(String resourcePath, List<ResourceType> 
expectedTypes) {
+        Path path = new Path(resourcePath);
         try {
-            checkPath(path, ResourceType.JAR);
-            return resourceInfos.remove(
-                    new ResourceUri(ResourceType.JAR, 
getURLFromPath(path).getPath()));
+            checkPath(path, expectedTypes);
+            String urlPath = getURLFromPath(path).getPath();
+
+            return expectedTypes.stream()
+                    .map(type -> resourceInfos.remove(new ResourceUri(type, 
urlPath)))
+                    .filter(Objects::nonNull)
+                    .findFirst()
+                    .orElse(null);
         } catch (IOException e) {
             throw new SqlExecutionException(
-                    String.format("Failed to unregister the jar resource 
[%s]", jarPath), e);
+                    String.format(
+                            "Failed to unregister the %s resource [%s]",
+                            expectedTypes.stream()
+                                    .map(ResourceType::name)
+                                    .map(String::toLowerCase)
+                                    .collect(Collectors.joining(" or ")),
+                            resourcePath),
+                    e);
         }
     }
 }
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index f0582dfe31e..c6269db8c83 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -194,6 +194,7 @@
   # Please keep the keyword in alphabetical order if new keyword is added.
   keywords: [
     "ANALYZE"
+    "ARTIFACT"
     "BUCKETS"
     "BYTES"
     "CATALOGS"
@@ -273,6 +274,7 @@
     "ALWAYS"
     "APPLY"
     "ARRAY_AGG"
+    "ARTIFACT"
     "ASC"
     "ASSERTION"
     "ASSIGNMENT"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ad370a6db62..cdf2767a212 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -450,7 +450,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace, 
boolean isTemporary) :
             if ("SQL".equals(functionLanguage) || 
"PYTHON".equals(functionLanguage)) {
                 throw SqlUtil.newContextException(
                     functionLanguagePos,
-                    
ParserResource.RESOURCE.createFunctionUsingJar(functionLanguage));
+                    ParserResource.RESOURCE.createFunction(functionLanguage));
             }
             List<SqlNode> resourceList = new ArrayList<SqlNode>();
             SqlResource sqlResource = null;
@@ -492,13 +492,21 @@ SqlResource SqlResourceInfo() :
     String resourcePath;
 }
 {
-    <JAR> <QUOTED_STRING> {
+    (<JAR> <QUOTED_STRING> {
         resourcePath = SqlParserUtil.parseString(token.image);
         return new SqlResource(
                     getPos(),
                     SqlResourceType.JAR.symbol(getPos()),
                     SqlLiteral.createCharString(resourcePath, getPos()));
     }
+    |
+    <ARTIFACT> <QUOTED_STRING> {
+        resourcePath = SqlParserUtil.parseString(token.image);
+        return new SqlResource(
+                    getPos(),
+                    SqlResourceType.ARTIFACT.symbol(getPos()),
+                    SqlLiteral.createCharString(resourcePath, getPos()));
+    })
 }
 
 SqlDrop SqlDropFunction(Span s, boolean replace, boolean isTemporary) :
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
index 87aaea34fc1..cb33fdae875 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
@@ -25,7 +25,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
 public enum SqlResourceType {
     FILE("FILE"),
     JAR("JAR"),
-    ARCHIVE("ARCHIVE");
+    ARCHIVE("ARCHIVE"),
+    ARTIFACT("ARTIFACT");
 
     private final String digest;
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 88a960259c3..323fe5da0aa 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -57,8 +57,9 @@ public interface ParserResource {
             "Columns identifiers without types in the schema are supported on 
CTAS/RTAS statements only.")
     Resources.ExInst<ParseException> columnsIdentifiersUnsupported();
 
-    @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable 
to {0} language.")
-    Resources.ExInst<ParseException> createFunctionUsingJar(String language);
+    @Resources.BaseMessage(
+            "CREATE FUNCTION USING JAR/ARTIFACT syntax is not applicable to 
{0} language.")
+    Resources.ExInst<ParseException> createFunction(String language);
 
     @Resources.BaseMessage("WITH DRAIN could only be used after WITH 
SAVEPOINT.")
     Resources.ExInst<ParseException> withDrainOnlyUsedWithSavepoint();
diff --git 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index daf0b1c899a..5cd2982867d 100644
--- 
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++ 
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -20,4 +20,4 @@ MultipleWatermarksUnsupported=Multiple WATERMARK declarations 
are not supported
 OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT 
statement.
 createSystemFunctionOnlySupportTemporary=CREATE SYSTEM FUNCTION is not 
supported, system functions can only be registered as temporary function, you 
can use CREATE TEMPORARY SYSTEM FUNCTION instead.
 explainDetailIsDuplicate=Duplicate EXPLAIN DETAIL is not allowed.
-createFunctionUsingJar=CREATE FUNCTION USING JAR syntax is not applicable to 
{0} language.
+createFunction=CREATE FUNCTION USING JAR/ARTIFACT syntax is not applicable to 
{0} language.
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a631156f387..a65ad52c566 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.util.List;
 import java.util.Locale;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -2509,49 +2510,91 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                                 + "system functions can only be registered as 
temporary "
                                 + "functions, you can use CREATE TEMPORARY 
SYSTEM FUNCTION instead.");
 
-        // test create function using jar
-        sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar'")
-                .ok(
-                        "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar'");
-
-        sql("create temporary function function1 as 
'org.apache.flink.function.function1' language scala using jar 
'/path/to/test.jar'")
-                .ok(
-                        "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR 
'/path/to/test.jar'");
-
-        sql("create temporary system function function1 as 
'org.apache.flink.function.function1' language scala using jar 
'/path/to/test.jar'")
-                .ok(
-                        "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR 
'/path/to/test.jar'");
-
-        sql("create function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar', jar 'hdfs:///path/to/test2.jar'")
-                .ok(
-                        "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'");
-
-        sql("create temporary function function1 as 
'org.apache.flink.function.function1' language ^sql^ using jar 
'file:///path/to/test.jar'")
-                .fails("CREATE FUNCTION USING JAR syntax is not applicable to 
SQL language.");
-
-        sql("create temporary function function1 as 
'org.apache.flink.function.function1' language ^python^ using jar 
'file:///path/to/test.jar'")
-                .fails("CREATE FUNCTION USING JAR syntax is not applicable to 
PYTHON language.");
+        // test creating functions with either jar or artifact
+        for (String usageType : List.of("JAR", "ARTIFACT")) {
+            sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using "
+                            + usageType
+                            + " 'file:///path/to/test.jar'")
+                    .ok(
+                            "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+                                    + usageType
+                                    + " 'file:///path/to/test.jar'");
+
+            sql("create temporary function function1 as 
'org.apache.flink.function.function1' language scala using "
+                            + usageType
+                            + " '/path/to/test.jar'")
+                    .ok(
+                            "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE SCALA USING "
+                                    + usageType
+                                    + " '/path/to/test.jar'");
+
+            sql("create temporary system function function1 as 
'org.apache.flink.function.function1' language scala using "
+                            + usageType
+                            + " '/path/to/test.jar'")
+                    .ok(
+                            "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE SCALA USING "
+                                    + usageType
+                                    + " '/path/to/test.jar'");
+
+            sql("create function function1 as 
'org.apache.flink.function.function1' language java using "
+                            + usageType
+                            + " 'file:///path/to/test.jar', jar 
'hdfs:///path/to/test2.jar'")
+                    .ok(
+                            "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+                                    + usageType
+                                    + " 'file:///path/to/test.jar', JAR 
'hdfs:///path/to/test2.jar'");
+
+            sql("create temporary function function1 as 
'org.apache.flink.function.function1' language ^sql^ using "
+                            + usageType
+                            + " 'file:///path/to/test.jar'")
+                    .fails(
+                            "CREATE FUNCTION USING JAR/ARTIFACT syntax is not 
applicable to SQL language.");
+
+            sql("create temporary function function1 as 
'org.apache.flink.function.function1' language ^python^ using "
+                            + usageType
+                            + " 'file:///path/to/test.jar'")
+                    .fails(
+                            "CREATE FUNCTION USING JAR/ARTIFACT syntax is not 
applicable to PYTHON language.");
+
+            sql("create function function1 as 
'org.apache.flink.function.function1' language java using "
+                            + usageType
+                            + " 'file:///path/to/test.jar' WITH ('k1' = 'v1', 
'k2' = 'v2')")
+                    .ok(
+                            "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+                                    + usageType
+                                    + " 'file:///path/to/test.jar'\nWITH (\n"
+                                    + "  'k1' = 'v1',\n"
+                                    + "  'k2' = 'v2'\n"
+                                    + ")");
+
+            sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using "
+                            + usageType
+                            + " 'file:///path/to/test.jar' WITH ('k1' = 'v1', 
'k2' = 'v2')")
+                    .ok(
+                            "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+                                    + usageType
+                                    + " 'file:///path/to/test.jar'\nWITH (\n"
+                                    + "  'k1' = 'v1',\n"
+                                    + "  'k2' = 'v2'\n"
+                                    + ")");
+        }
+
+        // test mixing jar and artifact keywords
+        sql("create function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar', artifact 'hdfs:///path/to/test2.jar'")
+                .ok(
+                        "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar', ARTIFACT 'hdfs:///path/to/test2.jar'");
+
+        sql("create function function1 as 
'org.apache.flink.function.function1' language java using artifact 
'file:///path/to/test.jar', jar 'hdfs:///path/to/test2.jar'")
+                .ok(
+                        "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING ARTIFACT 
'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'");
 
         sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using ^file^ 
'file:///path/to/test'")
                 .fails(
                         "Encountered \"file\" at line 1, column 98.\n"
-                                + "Was expecting:\n"
+                                + "Was expecting one of:\n"
+                                + "    \"ARTIFACT\" ...\n"
                                 + "    \"JAR\" ...\n"
                                 + "    .*");
-
-        sql("create function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
-                .ok(
-                        "CREATE FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar'\nWITH (\n"
-                                + "  'k1' = 'v1',\n"
-                                + "  'k2' = 'v2'\n"
-                                + ")");
-
-        sql("create temporary function function1 as 
'org.apache.flink.function.function1' language java using jar 
'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
-                .ok(
-                        "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR 
'file:///path/to/test.jar'\nWITH (\n"
-                                + "  'k1' = 'v1',\n"
-                                + "  'k2' = 'v2'\n"
-                                + ")");
     }
 
     @Test
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index d0d72467c33..ccc5495c2b0 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -141,12 +141,12 @@ public final class FunctionCatalog {
                             "Could not drop temporary system function. A 
function named '%s' doesn't exist.",
                             name));
         }
-        unregisterFunctionJarResources(function);
+        unregisterFunctionResources(function);
 
         return function != null;
     }
 
-    private void unregisterFunctionJarResources(@Nullable CatalogFunction 
function) {
+    private void unregisterFunctionResources(@Nullable CatalogFunction 
function) {
         if (function != null && function.getFunctionLanguage() == 
FunctionLanguage.JAVA) {
             
resourceManager.unregisterFunctionResources(function.getFunctionResources());
         }
@@ -509,7 +509,7 @@ public final class FunctionCatalog {
                     .getTemporaryOperationListener(normalizedName)
                     .ifPresent(l -> 
l.onDropTemporaryFunction(normalizedName.toObjectPath()));
             tempCatalogFunctions.remove(normalizedName);
-            unregisterFunctionJarResources(fd);
+            unregisterFunctionResources(fd);
         } else if (!ignoreIfNotExist) {
             throw new ValidationException(
                     String.format("Temporary catalog function %s doesn't 
exist", identifier));
@@ -601,8 +601,7 @@ public final class FunctionCatalog {
         CatalogFunction potentialResult = 
tempCatalogFunctions.get(normalizedIdentifier);
 
         if (potentialResult != null) {
-            registerFunctionJarResources(
-                    oi.asSummaryString(), 
potentialResult.getFunctionResources());
+            registerFunctionResources(oi.asSummaryString(), 
potentialResult.getFunctionResources());
             return Optional.of(
                     ContextResolvedFunction.temporary(
                             FunctionIdentifier.of(oi),
@@ -622,7 +621,7 @@ public final class FunctionCatalog {
                 FunctionDefinition fd;
                 if (catalog.getFunctionDefinitionFactory().isPresent()
                         && catalogFunction.getFunctionLanguage() != 
FunctionLanguage.PYTHON) {
-                    registerFunctionJarResources(
+                    registerFunctionResources(
                             oi.asSummaryString(), 
catalogFunction.getFunctionResources());
                     fd =
                             catalog.getFunctionDefinitionFactory()
@@ -656,7 +655,7 @@ public final class FunctionCatalog {
         String normalizedName = FunctionIdentifier.normalizeName(funcName);
         if (tempSystemFunctions.containsKey(normalizedName)) {
             CatalogFunction function = tempSystemFunctions.get(normalizedName);
-            registerFunctionJarResources(funcName, 
function.getFunctionResources());
+            registerFunctionResources(funcName, 
function.getFunctionResources());
             return Optional.of(
                     ContextResolvedFunction.temporary(
                             FunctionIdentifier.of(funcName),
@@ -732,7 +731,7 @@ public final class FunctionCatalog {
         }
         // If the jar resource of UDF used is not empty, register it to 
classloader before
         // validate.
-        registerFunctionJarResources(name, function.getFunctionResources());
+        registerFunctionResources(name, function.getFunctionResources());
 
         return UserDefinedFunctionHelper.instantiateFunction(
                 resourceManager.getUserClassLoader(),
@@ -742,15 +741,15 @@ public final class FunctionCatalog {
                 function);
     }
 
-    public void registerFunctionJarResources(String functionName, 
List<ResourceUri> resourceUris) {
+    public void registerFunctionResources(String functionName, 
List<ResourceUri> resourceUris) {
         try {
             if (!resourceUris.isEmpty()) {
-                resourceManager.registerJarResources(resourceUris);
+                resourceManager.registerResources(resourceUris);
             }
         } catch (Exception e) {
             throw new TableException(
                     String.format(
-                            "Failed to register jar resource '%s' of function 
'%s'.",
+                            "Failed to register resource '%s' of function 
'%s'.",
                             resourceUris, functionName),
                     e);
         }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
index 03f40658bd0..02638cbfa77 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
@@ -53,7 +53,7 @@ public class AddJarOperation implements Operation, 
ExecutableOperation {
     public TableResultInternal execute(Context ctx) {
         ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, getPath());
         try {
-            
ctx.getResourceManager().registerJarResources(Collections.singletonList(resourceUri));
+            
ctx.getResourceManager().registerResources(Collections.singletonList(resourceUri));
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (IOException e) {
             throw new TableException(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index d2a71edaf1f..4edfacf09ae 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -118,20 +118,21 @@ public class ResourceManager implements Closeable {
      * Due to anyone of the resource in list maybe fail during register, so we 
should stage it
      * before actual register to guarantee transaction process. If all the 
resources are available,
      * register them into the {@link ResourceManager}.
+     *
+     * <p>Accepts both JAR and ARTIFACT resources.
      */
-    public void registerJarResources(List<ResourceUri> resourceUris) throws 
IOException {
+    public void registerResources(List<ResourceUri> resourceUris) throws 
IOException {
         registerResources(
                 prepareStagingResources(
                         resourceUris,
-                        ResourceType.JAR,
+                        List.of(ResourceType.JAR, ResourceType.ARTIFACT),
                         true,
                         url -> {
                             try {
                                 JarUtils.checkJarFile(url);
                             } catch (IOException e) {
                                 throw new ValidationException(
-                                        String.format("Failed to register jar 
resource [%s]", url),
-                                        e);
+                                        String.format("Failed to register 
resource [%s]", url), e);
                             }
                         },
                         false),
@@ -171,7 +172,7 @@ public class ResourceManager implements Closeable {
     public void declareFunctionResources(Set<ResourceUri> resourceUris) throws 
IOException {
         prepareStagingResources(
                 resourceUris,
-                ResourceType.JAR,
+                List.of(ResourceType.JAR, ResourceType.ARTIFACT),
                 true,
                 url -> {
                     try {
@@ -235,7 +236,11 @@ public class ResourceManager implements Closeable {
      */
     public Set<URL> getLocalJarResources() {
         return resourceInfos.entrySet().stream()
-                .filter(entry -> 
ResourceType.JAR.equals(entry.getKey().getResourceType()))
+                .filter(
+                        entry ->
+                                
ResourceType.JAR.equals(entry.getKey().getResourceType())
+                                        || ResourceType.ARTIFACT.equals(
+                                                
entry.getKey().getResourceType()))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toSet());
     }
@@ -340,29 +345,38 @@ public class ResourceManager implements Closeable {
     // ------------------------------------------------------------------------
 
     protected void checkPath(Path path, ResourceType expectedType) throws 
IOException {
+        checkPath(path, Collections.singletonList(expectedType));
+    }
+
+    protected void checkPath(Path path, List<ResourceType> expectedTypes) 
throws IOException {
         FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
         // check resource exists firstly
         if (!fs.exists(path)) {
             throw new FileNotFoundException(
                     String.format(
                             "%s resource [%s] not found.",
-                            expectedType.name().toLowerCase(), path));
+                            expectedTypes.stream()
+                                    .map(ResourceType::name)
+                                    .map(String::toLowerCase)
+                                    .collect(Collectors.joining(" or ")),
+                            path));
         }
         // register directory is not allowed for resource
         if (fs.getFileStatus(path).isDir()) {
             throw new ValidationException(
                     String.format(
-                            "The registering or unregistering %s resource [%s] 
is a directory that is not allowed.",
-                            expectedType.name().toLowerCase(), path));
+                            "The registering or unregistering resource [%s] is 
a directory, which is not allowed.",
+                            path));
         }
 
-        if (expectedType == ResourceType.JAR) {
+        if (expectedTypes.contains(ResourceType.JAR)
+                || expectedTypes.contains(ResourceType.ARTIFACT)) {
             // file name should end with .jar suffix
             String fileExtension = Files.getFileExtension(path.getName());
             if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
                 throw new ValidationException(
                         String.format(
-                                "The registering or unregistering jar resource 
[%s] must ends with '.jar' suffix.",
+                                "The registering or unregistering resource 
[%s] must end with a '.jar' suffix.",
                                 path));
             }
         }
@@ -457,19 +471,24 @@ public class ResourceManager implements Closeable {
         return Collections.emptyMap();
     }
 
-    private void checkResources(Collection<ResourceUri> resourceUris, 
ResourceType expectedType)
+    private void checkResources(
+            Collection<ResourceUri> resourceUris, List<ResourceType> 
expectedTypes)
             throws IOException {
-        // check the resource type
+        // check the resource types
         if (resourceUris.stream()
-                .anyMatch(resourceUri -> expectedType != 
resourceUri.getResourceType())) {
+                .anyMatch(resourceUri -> 
!expectedTypes.contains(resourceUri.getResourceType()))) {
             throw new ValidationException(
                     String.format(
                             "Expect the resource type to be %s, but encounter 
a resource %s.",
-                            expectedType.name().toLowerCase(),
+                            expectedTypes.stream()
+                                    .map(ResourceType::name)
+                                    .map(String::toLowerCase)
+                                    .collect(Collectors.joining(" or ")),
                             resourceUris.stream()
                                     .filter(
                                             resourceUri ->
-                                                    expectedType != 
resourceUri.getResourceType())
+                                                    !expectedTypes.contains(
+                                                            
resourceUri.getResourceType()))
                                     .findFirst()
                                     .map(
                                             resourceUri ->
@@ -485,7 +504,7 @@ public class ResourceManager implements Closeable {
 
         // check the resource path
         for (ResourceUri resourceUri : resourceUris) {
-            checkPath(new Path(resourceUri.getUri()), expectedType);
+            checkPath(new Path(resourceUri.getUri()), expectedTypes);
         }
     }
 
@@ -496,7 +515,22 @@ public class ResourceManager implements Closeable {
             Consumer<URL> resourceChecker,
             boolean declareFunctionResource)
             throws IOException {
-        checkResources(resourceUris, expectedType);
+        return prepareStagingResources(
+                resourceUris,
+                Collections.singletonList(expectedType),
+                executable,
+                resourceChecker,
+                declareFunctionResource);
+    }
+
+    private Map<ResourceUri, URL> prepareStagingResources(
+            Collection<ResourceUri> resourceUris,
+            List<ResourceType> expectedTypes,
+            boolean executable,
+            Consumer<URL> resourceChecker,
+            boolean declareFunctionResource)
+            throws IOException {
+        checkResources(resourceUris, expectedTypes);
 
         Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>();
         boolean supportOverwrite = !executable;
@@ -514,7 +548,8 @@ public class ResourceManager implements Closeable {
 
             URL localUrl;
             ResourceUri localResourceUri = resourceUri;
-            if (expectedType == ResourceType.JAR
+            if ((expectedTypes.contains(ResourceType.JAR)
+                            || expectedTypes.contains(ResourceType.ARTIFACT))
                     && functionResourceInfos.containsKey(resourceUri)) {
                 // Get local url from function resource infos.
                 localUrl = functionResourceInfos.get(resourceUri).url;
@@ -532,7 +567,8 @@ public class ResourceManager implements Closeable {
                     localUrl = getURLFromPath(path);
                     // if the local resource is a relative path, here convert 
it to an absolute path
                     // before register
-                    localResourceUri = new ResourceUri(expectedType, 
localUrl.getPath());
+                    localResourceUri =
+                            new ResourceUri(resourceUri.getResourceType(), 
localUrl.getPath());
                 }
 
                 // check the local file
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index 0b14e7f8de6..d1bea7f2c77 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -55,6 +55,7 @@ import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -84,6 +85,8 @@ public class ResourceManagerTest {
     @TempDir private static File tempFolder;
     private static File udfJar;
 
+    private static File udfArtifact;
+
     private static File file;
 
     private ResourceManager resourceManager;
@@ -93,7 +96,13 @@ public class ResourceManagerTest {
         udfJar =
                 UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder,
-                        "test-classloader-udf.jar",
+                        "test-classloader-udf-jar.jar",
+                        GENERATED_LOWER_UDF_CLASS,
+                        String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS));
+        udfArtifact =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempFolder,
+                        "test-classloader-udf-artifact.jar",
                         GENERATED_LOWER_UDF_CLASS,
                         String.format(GENERATED_LOWER_UDF_CODE, 
GENERATED_LOWER_UDF_CLASS));
         file = File.createTempFile("ResourceManagerTest", ".txt", tempFolder);
@@ -114,7 +123,7 @@ public class ResourceManagerTest {
     }
 
     @Test
-    public void testRegisterJarResource() throws Exception {
+    public void testRegisterResources() throws Exception {
         URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
 
         // test class loading before register resource
@@ -123,14 +132,20 @@ public class ResourceManagerTest {
                 ClassNotFoundException.class,
                 () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, 
userClassLoader));
 
-        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
-        // register the same jar repeatedly
-        resourceManager.registerJarResources(Arrays.asList(resourceUri, 
resourceUri));
+        ResourceUri jarResourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
+        ResourceUri artifactResourceUri =
+                new ResourceUri(ResourceType.ARTIFACT, udfArtifact.getPath());
+
+        // register the same artifacts repeatedly
+        resourceManager.registerResources(Arrays.asList(jarResourceUri, 
artifactResourceUri));
+        resourceManager.registerResources(Arrays.asList(jarResourceUri, 
artifactResourceUri));
 
         // assert resource infos
-        Map<ResourceUri, URL> expected =
-                Collections.singletonMap(
-                        resourceUri, resourceManager.getURLFromPath(new 
Path(udfJar.getPath())));
+        Map<ResourceUri, URL> expected = new HashMap<>();
+        expected.put(jarResourceUri, resourceManager.getURLFromPath(new 
Path(udfJar.getPath())));
+        expected.put(
+                artifactResourceUri,
+                resourceManager.getURLFromPath(new 
Path(udfArtifact.getPath())));
 
         assertEquals(expected, resourceManager.getResources());
 
@@ -199,7 +214,7 @@ public class ResourceManagerTest {
                                 .relativize(udfJar.toPath())
                                 .toString());
         // register jar
-        
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
+        
resourceManager.registerResources(Collections.singletonList(resourceUri));
 
         // assert resource infos
         Map<ResourceUri, URL> expected =
@@ -217,16 +232,16 @@ public class ResourceManagerTest {
     }
 
     @Test
-    public void testRegisterInvalidJarResource() throws Exception {
+    public void testRegisterInvalidResource() throws Exception {
         final String fileUri = file.getPath();
 
         CommonTestUtils.assertThrows(
                 String.format(
-                        "Expect the resource type to be jar, but encounter a 
resource [%s] with type %s.",
+                        "Expect the resource type to be jar or artifact, but 
encounter a resource [%s] with type %s.",
                         fileUri, ResourceType.FILE.name().toLowerCase()),
                 ValidationException.class,
                 () -> {
-                    resourceManager.registerJarResources(
+                    resourceManager.registerResources(
                             Collections.singletonList(new 
ResourceUri(ResourceType.FILE, fileUri)));
                     return null;
                 });
@@ -234,11 +249,11 @@ public class ResourceManagerTest {
         // test register jar resource with invalid suffix
         CommonTestUtils.assertThrows(
                 String.format(
-                        "The registering or unregistering jar resource [%s] 
must ends with '.jar' suffix.",
+                        "The registering or unregistering resource [%s] must 
end with a '.jar' suffix.",
                         fileUri),
                 ValidationException.class,
                 () -> {
-                    resourceManager.registerJarResources(
+                    resourceManager.registerResources(
                             Collections.singletonList(new 
ResourceUri(ResourceType.JAR, fileUri)));
                     return null;
                 });
@@ -248,11 +263,11 @@ public class ResourceManagerTest {
 
         CommonTestUtils.assertThrows(
                 String.format(
-                        "The registering or unregistering jar resource [%s] is 
a directory that is not allowed.",
+                        "The registering or unregistering resource [%s] is a 
directory, which is not allowed.",
                         jarDir),
                 ValidationException.class,
                 () -> {
-                    resourceManager.registerJarResources(
+                    resourceManager.registerResources(
                             Collections.singletonList(new 
ResourceUri(ResourceType.JAR, jarDir)));
                     return null;
                 });
@@ -263,11 +278,11 @@ public class ResourceManagerTest {
 
         CommonTestUtils.assertThrows(
                 String.format(
-                        "The registering or unregistering jar resource [%s] is 
a directory that is not allowed.",
+                        "The registering or unregistering resource [%s] is a 
directory, which is not allowed.",
                         jarPath),
                 ValidationException.class,
                 () -> {
-                    resourceManager.registerJarResources(
+                    resourceManager.registerResources(
                             Collections.singletonList(new 
ResourceUri(ResourceType.JAR, jarPath)));
                     return null;
                 });
@@ -446,7 +461,7 @@ public class ResourceManagerTest {
     void testCloseCopiedResourceManager() throws Exception {
         ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, 
udfJar.getPath());
         
resourceManager.declareFunctionResources(Collections.singleton(resourceUri));
-        
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
+        
resourceManager.registerResources(Collections.singletonList(resourceUri));
         
assertThat(resourceManager.functionResourceInfos().size()).isEqualTo(1);
         assertThat(resourceManager.resourceInfos.size()).isEqualTo(1);
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
index 15bbdd6656e..5f81b6d1e58 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
@@ -25,5 +25,6 @@ import org.apache.flink.annotation.PublicEvolving;
 public enum ResourceType {
     FILE,
     JAR,
-    ARCHIVE
+    ARCHIVE,
+    ARTIFACT,
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 82a3a15724a..9bfa2dd7549 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -196,6 +196,9 @@ public class OperationConverterUtils {
                                 case JAR:
                                     resourceType = ResourceType.JAR;
                                     break;
+                                case ARTIFACT:
+                                    resourceType = ResourceType.ARTIFACT;
+                                    break;
                                 case ARCHIVE:
                                     resourceType = ResourceType.ARCHIVE;
                                     break;
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 9f657eddb72..832c4f7a9c1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -203,7 +203,7 @@ class PartitionableSourceITCase extends BatchTestBase {
     tEnv
       .asInstanceOf[TestingTableEnvironment]
       .getResourceManager
-      .registerJarResources(
+      .registerResources(
         Collections.singletonList(new ResourceUri(ResourceType.JAR, 
udfJarFile.toURI.toString)))
 
     tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'")

Reply via email to