This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ce477c31562 [FLINK-39081] Add ARTIFACT keyword option in CREATE
FUNCTION's USING clause
ce477c31562 is described below
commit ce477c31562dcd732e73359e81141b8628fab11c
Author: Mika Naylor <[email protected]>
AuthorDate: Fri Feb 13 14:28:50 2026 +0100
[FLINK-39081] Add ARTIFACT keyword option in CREATE FUNCTION's USING clause
---
.../sql/hive-compatibility/hive-dialect/create.md | 6 +-
docs/content.zh/docs/sql/reference/ddl/create.md | 2 +-
.../sql/hive-compatibility/hive-dialect/create.md | 6 +-
docs/content/docs/sql/reference/ddl/create.md | 8 +-
.../table/client/gateway/SingleSessionManager.java | 5 +-
.../client/resource/ClientResourceManager.java | 27 +++--
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/includes/parserImpls.ftl | 12 ++-
.../sql/parser/ddl/resource/SqlResourceType.java | 3 +-
.../flink/sql/parser/utils/ParserResource.java | 5 +-
.../ParserResource.properties | 2 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 117 ++++++++++++++-------
.../flink/table/catalog/FunctionCatalog.java | 21 ++--
.../table/operations/command/AddJarOperation.java | 2 +-
.../flink/table/resource/ResourceManager.java | 76 +++++++++----
.../flink/table/resource/ResourceManagerTest.java | 53 ++++++----
.../apache/flink/table/resource/ResourceType.java | 3 +-
.../planner/utils/OperationConverterUtils.java | 3 +
.../batch/sql/PartitionableSourceITCase.scala | 2 +-
19 files changed, 240 insertions(+), 115 deletions(-)
diff --git a/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
b/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
index ae357260f70..ef930f6687b 100644
--- a/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
+++ b/docs/content.zh/docs/sql/hive-compatibility/hive-dialect/create.md
@@ -212,7 +212,7 @@ CREATE TEMPORARY MACRO simple_add (x int, y int) x + y;
#### Create Temporary Function
```sql
-CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR 'file_uri'];
+CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|ARTIFACT
'file_uri'];
```
The function exists for the duration of the current session.
@@ -221,12 +221,12 @@ The function exists for the duration of the current
session.
```sql
CREATE FUNCTION [db_name.]function_name AS class_name
- [USING JAR 'file_uri'];
+ [USING [JAR|ARTIFACT] 'file_uri'];
```
The function is registered to metastore and will exist in all session unless
the function is dropped.
### Parameter
-- `[USING JAR 'file_uri']`
+- `[USING [JAR|ARTIFACT] 'file_uri']`
User can use the clause to add Jar that contains the implementation of the
function along with its dependencies while creating the function.
The `file_uri` can be on local file or distributed file system.
diff --git a/docs/content.zh/docs/sql/reference/ddl/create.md
b/docs/content.zh/docs/sql/reference/ddl/create.md
index e5314fec71e..cffd9d689a6 100644
--- a/docs/content.zh/docs/sql/reference/ddl/create.md
+++ b/docs/content.zh/docs/sql/reference/ddl/create.md
@@ -876,7 +876,7 @@ CREATE [TEMPORARY] VIEW [IF NOT EXISTS]
[catalog_name.][db_name.]view_name
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [[catalog_name.]db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
- [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
+ [USING [JAR|ARTIFACT] '<path_to_filename>.jar' [, JAR
'<path_to_filename>.jar']* ]
[WITH (key1=val1, key2=val2, ...)]
```
diff --git a/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
b/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
index 356b4dd98fe..a2bc7a7a4d3 100644
--- a/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
+++ b/docs/content/docs/sql/hive-compatibility/hive-dialect/create.md
@@ -212,7 +212,7 @@ CREATE TEMPORARY MACRO simple_add (x int, y int) x + y;
#### Create Temporary Function
```sql
-CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR 'file_uri'];
+CREATE TEMPORARY FUNCTION function_name AS class_name [USING [JAR|ARTIFACT]
'file_uri'];
```
The function exists for the duration of the current session.
@@ -221,12 +221,12 @@ The function exists for the duration of the current
session.
```sql
CREATE FUNCTION [db_name.]function_name AS class_name
- [USING JAR 'file_uri'];
+ [USING [JAR|ARTIFACT] 'file_uri'];
```
The function is registered to metastore and will exist in all session unless
the function is dropped.
### Parameter
-- `[USING JAR 'file_uri']`
+- `[USING [JAR|ARTIFACT] 'file_uri']`
User can use the clause to add Jar that contains the implementation of the
function along with its dependencies while creating the function.
The `file_uri` can be on local file or distributed file system.
diff --git a/docs/content/docs/sql/reference/ddl/create.md
b/docs/content/docs/sql/reference/ddl/create.md
index fff1f392b78..b72fa7a2dea 100644
--- a/docs/content/docs/sql/reference/ddl/create.md
+++ b/docs/content/docs/sql/reference/ddl/create.md
@@ -857,10 +857,10 @@ If the view already exists, nothing happens.
## CREATE FUNCTION
```sql
-CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
- [IF NOT EXISTS] [catalog_name.][db_name.]function_name
- AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
- [USING JAR '<path_to_filename>.jar' [, JAR '<path_to_filename>.jar']* ]
+CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
+ [IF NOT EXISTS] [catalog_name.][db_name.]function_name
+ AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
+ [USING [JAR|ARTIFACT] '<path_to_filename>.jar' [, JAR
'<path_to_filename>.jar']* ]
[WITH (key1=val1, key2=val2, ...)]
```
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
index 99dd4e72ed2..2c1cb101209 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SingleSessionManager.java
@@ -188,7 +188,7 @@ public class SingleSessionManager implements SessionManager
{
ClientResourceManager resourceManager =
new ClientResourceManager(configuration, userClassLoader);
try {
- resourceManager.registerJarResources(
+ resourceManager.registerResources(
dependencies.stream()
.map(uri -> new ResourceUri(ResourceType.JAR,
uri.toString()))
.collect(Collectors.toList()));
@@ -221,7 +221,8 @@ public class SingleSessionManager implements SessionManager
{
protected ResultFetcher callRemoveJar(OperationHandle operationHandle,
String jarPath) {
URL jarURL =
((ClientResourceManager)
sessionContext.getSessionState().resourceManager)
- .unregisterJarResource(jarPath);
+ .unregisterResource(
+ jarPath, List.of(ResourceType.JAR,
ResourceType.ARTIFACT));
if (jarURL != null) {
((ClientWrapperClassLoader)
sessionContext
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
index 13e64b6f499..df9aa45e39a 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
@@ -31,6 +31,9 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URL;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* The {@link ClientResourceManager} is able to remove the registered JAR
resources with the
@@ -48,15 +51,27 @@ public class ClientResourceManager extends ResourceManager {
}
@Nullable
- public URL unregisterJarResource(String jarPath) {
- Path path = new Path(jarPath);
+ public URL unregisterResource(String resourcePath, List<ResourceType>
expectedTypes) {
+ Path path = new Path(resourcePath);
try {
- checkPath(path, ResourceType.JAR);
- return resourceInfos.remove(
- new ResourceUri(ResourceType.JAR,
getURLFromPath(path).getPath()));
+ checkPath(path, expectedTypes);
+ String urlPath = getURLFromPath(path).getPath();
+
+ return expectedTypes.stream()
+ .map(type -> resourceInfos.remove(new ResourceUri(type,
urlPath)))
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElse(null);
} catch (IOException e) {
throw new SqlExecutionException(
- String.format("Failed to unregister the jar resource
[%s]", jarPath), e);
+ String.format(
+ "Failed to unregister the %s resource [%s]",
+ expectedTypes.stream()
+ .map(ResourceType::name)
+ .map(String::toLowerCase)
+ .collect(Collectors.joining(" or ")),
+ resourcePath),
+ e);
}
}
}
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index f0582dfe31e..c6269db8c83 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -194,6 +194,7 @@
# Please keep the keyword in alphabetical order if new keyword is added.
keywords: [
"ANALYZE"
+ "ARTIFACT"
"BUCKETS"
"BYTES"
"CATALOGS"
@@ -273,6 +274,7 @@
"ALWAYS"
"APPLY"
"ARRAY_AGG"
+ "ARTIFACT"
"ASC"
"ASSERTION"
"ASSIGNMENT"
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ad370a6db62..cdf2767a212 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -450,7 +450,7 @@ SqlCreate SqlCreateFunction(Span s, boolean replace,
boolean isTemporary) :
if ("SQL".equals(functionLanguage) ||
"PYTHON".equals(functionLanguage)) {
throw SqlUtil.newContextException(
functionLanguagePos,
-
ParserResource.RESOURCE.createFunctionUsingJar(functionLanguage));
+ ParserResource.RESOURCE.createFunction(functionLanguage));
}
List<SqlNode> resourceList = new ArrayList<SqlNode>();
SqlResource sqlResource = null;
@@ -492,13 +492,21 @@ SqlResource SqlResourceInfo() :
String resourcePath;
}
{
- <JAR> <QUOTED_STRING> {
+ (<JAR> <QUOTED_STRING> {
resourcePath = SqlParserUtil.parseString(token.image);
return new SqlResource(
getPos(),
SqlResourceType.JAR.symbol(getPos()),
SqlLiteral.createCharString(resourcePath, getPos()));
}
+ |
+ <ARTIFACT> <QUOTED_STRING> {
+ resourcePath = SqlParserUtil.parseString(token.image);
+ return new SqlResource(
+ getPos(),
+ SqlResourceType.ARTIFACT.symbol(getPos()),
+ SqlLiteral.createCharString(resourcePath, getPos()));
+ })
}
SqlDrop SqlDropFunction(Span s, boolean replace, boolean isTemporary) :
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
index 87aaea34fc1..cb33fdae875 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/resource/SqlResourceType.java
@@ -25,7 +25,8 @@ import org.apache.calcite.sql.parser.SqlParserPos;
public enum SqlResourceType {
FILE("FILE"),
JAR("JAR"),
- ARCHIVE("ARCHIVE");
+ ARCHIVE("ARCHIVE"),
+ ARTIFACT("ARTIFACT");
private final String digest;
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 88a960259c3..323fe5da0aa 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -57,8 +57,9 @@ public interface ParserResource {
"Columns identifiers without types in the schema are supported on
CTAS/RTAS statements only.")
Resources.ExInst<ParseException> columnsIdentifiersUnsupported();
- @Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable
to {0} language.")
- Resources.ExInst<ParseException> createFunctionUsingJar(String language);
+ @Resources.BaseMessage(
+ "CREATE FUNCTION USING JAR/ARTIFACT syntax is not applicable to
{0} language.")
+ Resources.ExInst<ParseException> createFunction(String language);
@Resources.BaseMessage("WITH DRAIN could only be used after WITH
SAVEPOINT.")
Resources.ExInst<ParseException> withDrainOnlyUsedWithSavepoint();
diff --git
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
index daf0b1c899a..5cd2982867d 100644
---
a/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
+++
b/flink-table/flink-sql-parser/src/main/resources/org.apache.flink.sql.parser.utils/ParserResource.properties
@@ -20,4 +20,4 @@ MultipleWatermarksUnsupported=Multiple WATERMARK declarations
are not supported
OverwriteIsOnlyUsedWithInsert=OVERWRITE expression is only used with INSERT
statement.
createSystemFunctionOnlySupportTemporary=CREATE SYSTEM FUNCTION is not
supported, system functions can only be registered as temporary function, you
can use CREATE TEMPORARY SYSTEM FUNCTION instead.
explainDetailIsDuplicate=Duplicate EXPLAIN DETAIL is not allowed.
-createFunctionUsingJar=CREATE FUNCTION USING JAR syntax is not applicable to
{0} language.
+createFunction=CREATE FUNCTION USING JAR/ARTIFACT syntax is not applicable to
{0} language.
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index a631156f387..a65ad52c566 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import java.util.List;
import java.util.Locale;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -2509,49 +2510,91 @@ class FlinkSqlParserImplTest extends SqlParserTest {
+ "system functions can only be registered as
temporary "
+ "functions, you can use CREATE TEMPORARY
SYSTEM FUNCTION instead.");
- // test create function using jar
- sql("create temporary function function1 as
'org.apache.flink.function.function1' language java using jar
'file:///path/to/test.jar'")
- .ok(
- "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar'");
-
- sql("create temporary function function1 as
'org.apache.flink.function.function1' language scala using jar
'/path/to/test.jar'")
- .ok(
- "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR
'/path/to/test.jar'");
-
- sql("create temporary system function function1 as
'org.apache.flink.function.function1' language scala using jar
'/path/to/test.jar'")
- .ok(
- "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE SCALA USING JAR
'/path/to/test.jar'");
-
- sql("create function function1 as
'org.apache.flink.function.function1' language java using jar
'file:///path/to/test.jar', jar 'hdfs:///path/to/test2.jar'")
- .ok(
- "CREATE FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'");
-
- sql("create temporary function function1 as
'org.apache.flink.function.function1' language ^sql^ using jar
'file:///path/to/test.jar'")
- .fails("CREATE FUNCTION USING JAR syntax is not applicable to
SQL language.");
-
- sql("create temporary function function1 as
'org.apache.flink.function.function1' language ^python^ using jar
'file:///path/to/test.jar'")
- .fails("CREATE FUNCTION USING JAR syntax is not applicable to
PYTHON language.");
+ // test creating functions with either jar or artifact
+ for (String usageType : List.of("JAR", "ARTIFACT")) {
+ sql("create temporary function function1 as
'org.apache.flink.function.function1' language java using "
+ + usageType
+ + " 'file:///path/to/test.jar'")
+ .ok(
+ "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+ + usageType
+ + " 'file:///path/to/test.jar'");
+
+ sql("create temporary function function1 as
'org.apache.flink.function.function1' language scala using "
+ + usageType
+ + " '/path/to/test.jar'")
+ .ok(
+ "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE SCALA USING "
+ + usageType
+ + " '/path/to/test.jar'");
+
+ sql("create temporary system function function1 as
'org.apache.flink.function.function1' language scala using "
+ + usageType
+ + " '/path/to/test.jar'")
+ .ok(
+ "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE SCALA USING "
+ + usageType
+ + " '/path/to/test.jar'");
+
+ sql("create function function1 as
'org.apache.flink.function.function1' language java using "
+ + usageType
+ + " 'file:///path/to/test.jar', jar
'hdfs:///path/to/test2.jar'")
+ .ok(
+ "CREATE FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+ + usageType
+ + " 'file:///path/to/test.jar', JAR
'hdfs:///path/to/test2.jar'");
+
+ sql("create temporary function function1 as
'org.apache.flink.function.function1' language ^sql^ using "
+ + usageType
+ + " 'file:///path/to/test.jar'")
+ .fails(
+ "CREATE FUNCTION USING JAR/ARTIFACT syntax is not
applicable to SQL language.");
+
+ sql("create temporary function function1 as
'org.apache.flink.function.function1' language ^python^ using "
+ + usageType
+ + " 'file:///path/to/test.jar'")
+ .fails(
+ "CREATE FUNCTION USING JAR/ARTIFACT syntax is not
applicable to PYTHON language.");
+
+ sql("create function function1 as
'org.apache.flink.function.function1' language java using "
+ + usageType
+ + " 'file:///path/to/test.jar' WITH ('k1' = 'v1',
'k2' = 'v2')")
+ .ok(
+ "CREATE FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+ + usageType
+ + " 'file:///path/to/test.jar'\nWITH (\n"
+ + " 'k1' = 'v1',\n"
+ + " 'k2' = 'v2'\n"
+ + ")");
+
+ sql("create temporary function function1 as
'org.apache.flink.function.function1' language java using "
+ + usageType
+ + " 'file:///path/to/test.jar' WITH ('k1' = 'v1',
'k2' = 'v2')")
+ .ok(
+ "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING "
+ + usageType
+ + " 'file:///path/to/test.jar'\nWITH (\n"
+ + " 'k1' = 'v1',\n"
+ + " 'k2' = 'v2'\n"
+ + ")");
+ }
+
+ // test mixing jar and artifact keywords
+ sql("create function function1 as
'org.apache.flink.function.function1' language java using jar
'file:///path/to/test.jar', artifact 'hdfs:///path/to/test2.jar'")
+ .ok(
+ "CREATE FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar', ARTIFACT 'hdfs:///path/to/test2.jar'");
+
+ sql("create function function1 as
'org.apache.flink.function.function1' language java using artifact
'file:///path/to/test.jar', jar 'hdfs:///path/to/test2.jar'")
+ .ok(
+ "CREATE FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING ARTIFACT
'file:///path/to/test.jar', JAR 'hdfs:///path/to/test2.jar'");
sql("create temporary function function1 as
'org.apache.flink.function.function1' language java using ^file^
'file:///path/to/test'")
.fails(
"Encountered \"file\" at line 1, column 98.\n"
- + "Was expecting:\n"
+ + "Was expecting one of:\n"
+ + " \"ARTIFACT\" ...\n"
+ " \"JAR\" ...\n"
+ " .*");
-
- sql("create function function1 as
'org.apache.flink.function.function1' language java using jar
'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
- .ok(
- "CREATE FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar'\nWITH (\n"
- + " 'k1' = 'v1',\n"
- + " 'k2' = 'v2'\n"
- + ")");
-
- sql("create temporary function function1 as
'org.apache.flink.function.function1' language java using jar
'file:///path/to/test.jar' WITH ('k1' = 'v1', 'k2' = 'v2')")
- .ok(
- "CREATE TEMPORARY FUNCTION `FUNCTION1` AS
'org.apache.flink.function.function1' LANGUAGE JAVA USING JAR
'file:///path/to/test.jar'\nWITH (\n"
- + " 'k1' = 'v1',\n"
- + " 'k2' = 'v2'\n"
- + ")");
}
@Test
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index d0d72467c33..ccc5495c2b0 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -141,12 +141,12 @@ public final class FunctionCatalog {
"Could not drop temporary system function. A
function named '%s' doesn't exist.",
name));
}
- unregisterFunctionJarResources(function);
+ unregisterFunctionResources(function);
return function != null;
}
- private void unregisterFunctionJarResources(@Nullable CatalogFunction
function) {
+ private void unregisterFunctionResources(@Nullable CatalogFunction
function) {
if (function != null && function.getFunctionLanguage() ==
FunctionLanguage.JAVA) {
resourceManager.unregisterFunctionResources(function.getFunctionResources());
}
@@ -509,7 +509,7 @@ public final class FunctionCatalog {
.getTemporaryOperationListener(normalizedName)
.ifPresent(l ->
l.onDropTemporaryFunction(normalizedName.toObjectPath()));
tempCatalogFunctions.remove(normalizedName);
- unregisterFunctionJarResources(fd);
+ unregisterFunctionResources(fd);
} else if (!ignoreIfNotExist) {
throw new ValidationException(
String.format("Temporary catalog function %s doesn't
exist", identifier));
@@ -601,8 +601,7 @@ public final class FunctionCatalog {
CatalogFunction potentialResult =
tempCatalogFunctions.get(normalizedIdentifier);
if (potentialResult != null) {
- registerFunctionJarResources(
- oi.asSummaryString(),
potentialResult.getFunctionResources());
+ registerFunctionResources(oi.asSummaryString(),
potentialResult.getFunctionResources());
return Optional.of(
ContextResolvedFunction.temporary(
FunctionIdentifier.of(oi),
@@ -622,7 +621,7 @@ public final class FunctionCatalog {
FunctionDefinition fd;
if (catalog.getFunctionDefinitionFactory().isPresent()
&& catalogFunction.getFunctionLanguage() !=
FunctionLanguage.PYTHON) {
- registerFunctionJarResources(
+ registerFunctionResources(
oi.asSummaryString(),
catalogFunction.getFunctionResources());
fd =
catalog.getFunctionDefinitionFactory()
@@ -656,7 +655,7 @@ public final class FunctionCatalog {
String normalizedName = FunctionIdentifier.normalizeName(funcName);
if (tempSystemFunctions.containsKey(normalizedName)) {
CatalogFunction function = tempSystemFunctions.get(normalizedName);
- registerFunctionJarResources(funcName,
function.getFunctionResources());
+ registerFunctionResources(funcName,
function.getFunctionResources());
return Optional.of(
ContextResolvedFunction.temporary(
FunctionIdentifier.of(funcName),
@@ -732,7 +731,7 @@ public final class FunctionCatalog {
}
// If the jar resource of UDF used is not empty, register it to
classloader before
// validate.
- registerFunctionJarResources(name, function.getFunctionResources());
+ registerFunctionResources(name, function.getFunctionResources());
return UserDefinedFunctionHelper.instantiateFunction(
resourceManager.getUserClassLoader(),
@@ -742,15 +741,15 @@ public final class FunctionCatalog {
function);
}
- public void registerFunctionJarResources(String functionName,
List<ResourceUri> resourceUris) {
+ public void registerFunctionResources(String functionName,
List<ResourceUri> resourceUris) {
try {
if (!resourceUris.isEmpty()) {
- resourceManager.registerJarResources(resourceUris);
+ resourceManager.registerResources(resourceUris);
}
} catch (Exception e) {
throw new TableException(
String.format(
- "Failed to register jar resource '%s' of function
'%s'.",
+ "Failed to register resource '%s' of function
'%s'.",
resourceUris, functionName),
e);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
index 03f40658bd0..02638cbfa77 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/AddJarOperation.java
@@ -53,7 +53,7 @@ public class AddJarOperation implements Operation,
ExecutableOperation {
public TableResultInternal execute(Context ctx) {
ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, getPath());
try {
-
ctx.getResourceManager().registerJarResources(Collections.singletonList(resourceUri));
+
ctx.getResourceManager().registerResources(Collections.singletonList(resourceUri));
return TableResultImpl.TABLE_RESULT_OK;
} catch (IOException e) {
throw new TableException(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index d2a71edaf1f..4edfacf09ae 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -118,20 +118,21 @@ public class ResourceManager implements Closeable {
* Due to anyone of the resource in list maybe fail during register, so we
should stage it
* before actual register to guarantee transaction process. If all the
resources are available,
* register them into the {@link ResourceManager}.
+ *
+ * <p>Accepts both JAR and ARTIFACT resources.
*/
- public void registerJarResources(List<ResourceUri> resourceUris) throws
IOException {
+ public void registerResources(List<ResourceUri> resourceUris) throws
IOException {
registerResources(
prepareStagingResources(
resourceUris,
- ResourceType.JAR,
+ List.of(ResourceType.JAR, ResourceType.ARTIFACT),
true,
url -> {
try {
JarUtils.checkJarFile(url);
} catch (IOException e) {
throw new ValidationException(
- String.format("Failed to register jar
resource [%s]", url),
- e);
+ String.format("Failed to register
resource [%s]", url), e);
}
},
false),
@@ -171,7 +172,7 @@ public class ResourceManager implements Closeable {
public void declareFunctionResources(Set<ResourceUri> resourceUris) throws
IOException {
prepareStagingResources(
resourceUris,
- ResourceType.JAR,
+ List.of(ResourceType.JAR, ResourceType.ARTIFACT),
true,
url -> {
try {
@@ -235,7 +236,11 @@ public class ResourceManager implements Closeable {
*/
public Set<URL> getLocalJarResources() {
return resourceInfos.entrySet().stream()
- .filter(entry ->
ResourceType.JAR.equals(entry.getKey().getResourceType()))
+ .filter(
+ entry ->
+
ResourceType.JAR.equals(entry.getKey().getResourceType())
+ || ResourceType.ARTIFACT.equals(
+
entry.getKey().getResourceType()))
.map(Map.Entry::getValue)
.collect(Collectors.toSet());
}
@@ -340,29 +345,38 @@ public class ResourceManager implements Closeable {
// ------------------------------------------------------------------------
protected void checkPath(Path path, ResourceType expectedType) throws
IOException {
+ checkPath(path, Collections.singletonList(expectedType));
+ }
+
+ protected void checkPath(Path path, List<ResourceType> expectedTypes)
throws IOException {
FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
// check resource exists firstly
if (!fs.exists(path)) {
throw new FileNotFoundException(
String.format(
"%s resource [%s] not found.",
- expectedType.name().toLowerCase(), path));
+ expectedTypes.stream()
+ .map(ResourceType::name)
+ .map(String::toLowerCase)
+ .collect(Collectors.joining(" or ")),
+ path));
}
// register directory is not allowed for resource
if (fs.getFileStatus(path).isDir()) {
throw new ValidationException(
String.format(
- "The registering or unregistering %s resource [%s]
is a directory that is not allowed.",
- expectedType.name().toLowerCase(), path));
+ "The registering or unregistering resource [%s] is
a directory, which is not allowed.",
+ path));
}
- if (expectedType == ResourceType.JAR) {
+ if (expectedTypes.contains(ResourceType.JAR)
+ || expectedTypes.contains(ResourceType.ARTIFACT)) {
// file name should end with .jar suffix
String fileExtension = Files.getFileExtension(path.getName());
if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
throw new ValidationException(
String.format(
- "The registering or unregistering jar resource
[%s] must ends with '.jar' suffix.",
+ "The registering or unregistering resource
[%s] must end with a '.jar' suffix.",
path));
}
}
@@ -457,19 +471,24 @@ public class ResourceManager implements Closeable {
return Collections.emptyMap();
}
- private void checkResources(Collection<ResourceUri> resourceUris,
ResourceType expectedType)
+ private void checkResources(
+ Collection<ResourceUri> resourceUris, List<ResourceType>
expectedTypes)
throws IOException {
- // check the resource type
+ // check the resource types
if (resourceUris.stream()
- .anyMatch(resourceUri -> expectedType !=
resourceUri.getResourceType())) {
+ .anyMatch(resourceUri ->
!expectedTypes.contains(resourceUri.getResourceType()))) {
throw new ValidationException(
String.format(
"Expect the resource type to be %s, but encounter
a resource %s.",
- expectedType.name().toLowerCase(),
+ expectedTypes.stream()
+ .map(ResourceType::name)
+ .map(String::toLowerCase)
+ .collect(Collectors.joining(" or ")),
resourceUris.stream()
.filter(
resourceUri ->
- expectedType !=
resourceUri.getResourceType())
+ !expectedTypes.contains(
+
resourceUri.getResourceType()))
.findFirst()
.map(
resourceUri ->
@@ -485,7 +504,7 @@ public class ResourceManager implements Closeable {
// check the resource path
for (ResourceUri resourceUri : resourceUris) {
- checkPath(new Path(resourceUri.getUri()), expectedType);
+ checkPath(new Path(resourceUri.getUri()), expectedTypes);
}
}
@@ -496,7 +515,22 @@ public class ResourceManager implements Closeable {
Consumer<URL> resourceChecker,
boolean declareFunctionResource)
throws IOException {
- checkResources(resourceUris, expectedType);
+ return prepareStagingResources(
+ resourceUris,
+ Collections.singletonList(expectedType),
+ executable,
+ resourceChecker,
+ declareFunctionResource);
+ }
+
+ private Map<ResourceUri, URL> prepareStagingResources(
+ Collection<ResourceUri> resourceUris,
+ List<ResourceType> expectedTypes,
+ boolean executable,
+ Consumer<URL> resourceChecker,
+ boolean declareFunctionResource)
+ throws IOException {
+ checkResources(resourceUris, expectedTypes);
Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>();
boolean supportOverwrite = !executable;
@@ -514,7 +548,8 @@ public class ResourceManager implements Closeable {
URL localUrl;
ResourceUri localResourceUri = resourceUri;
- if (expectedType == ResourceType.JAR
+ if ((expectedTypes.contains(ResourceType.JAR)
+ || expectedTypes.contains(ResourceType.ARTIFACT))
&& functionResourceInfos.containsKey(resourceUri)) {
// Get local url from function resource infos.
localUrl = functionResourceInfos.get(resourceUri).url;
@@ -532,7 +567,8 @@ public class ResourceManager implements Closeable {
localUrl = getURLFromPath(path);
// if the local resource is a relative path, here convert
it to an absolute path
// before register
- localResourceUri = new ResourceUri(expectedType,
localUrl.getPath());
+ localResourceUri =
+ new ResourceUri(resourceUri.getResourceType(),
localUrl.getPath());
}
// check the local file
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index 0b14e7f8de6..d1bea7f2c77 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -55,6 +55,7 @@ import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -84,6 +85,8 @@ public class ResourceManagerTest {
@TempDir private static File tempFolder;
private static File udfJar;
+ private static File udfArtifact;
+
private static File file;
private ResourceManager resourceManager;
@@ -93,7 +96,13 @@ public class ResourceManagerTest {
udfJar =
UserClassLoaderJarTestUtils.createJarFile(
tempFolder,
- "test-classloader-udf.jar",
+ "test-classloader-udf-jar.jar",
+ GENERATED_LOWER_UDF_CLASS,
+ String.format(GENERATED_LOWER_UDF_CODE,
GENERATED_LOWER_UDF_CLASS));
+ udfArtifact =
+ UserClassLoaderJarTestUtils.createJarFile(
+ tempFolder,
+ "test-classloader-udf-artifact.jar",
GENERATED_LOWER_UDF_CLASS,
String.format(GENERATED_LOWER_UDF_CODE,
GENERATED_LOWER_UDF_CLASS));
file = File.createTempFile("ResourceManagerTest", ".txt", tempFolder);
@@ -114,7 +123,7 @@ public class ResourceManagerTest {
}
@Test
- public void testRegisterJarResource() throws Exception {
+ public void testRegisterResources() throws Exception {
URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
// test class loading before register resource
@@ -123,14 +132,20 @@ public class ResourceManagerTest {
ClassNotFoundException.class,
() -> Class.forName(GENERATED_LOWER_UDF_CLASS, false,
userClassLoader));
- ResourceUri resourceUri = new ResourceUri(ResourceType.JAR,
udfJar.getPath());
- // register the same jar repeatedly
- resourceManager.registerJarResources(Arrays.asList(resourceUri,
resourceUri));
+ ResourceUri jarResourceUri = new ResourceUri(ResourceType.JAR,
udfJar.getPath());
+ ResourceUri artifactResourceUri =
+ new ResourceUri(ResourceType.ARTIFACT, udfArtifact.getPath());
+
+ // register the same artifacts repeatedly
+ resourceManager.registerResources(Arrays.asList(jarResourceUri,
artifactResourceUri));
+ resourceManager.registerResources(Arrays.asList(jarResourceUri,
artifactResourceUri));
// assert resource infos
- Map<ResourceUri, URL> expected =
- Collections.singletonMap(
- resourceUri, resourceManager.getURLFromPath(new
Path(udfJar.getPath())));
+ Map<ResourceUri, URL> expected = new HashMap<>();
+ expected.put(jarResourceUri, resourceManager.getURLFromPath(new
Path(udfJar.getPath())));
+ expected.put(
+ artifactResourceUri,
+ resourceManager.getURLFromPath(new
Path(udfArtifact.getPath())));
assertEquals(expected, resourceManager.getResources());
@@ -199,7 +214,7 @@ public class ResourceManagerTest {
.relativize(udfJar.toPath())
.toString());
// register jar
-
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
+
resourceManager.registerResources(Collections.singletonList(resourceUri));
// assert resource infos
Map<ResourceUri, URL> expected =
@@ -217,16 +232,16 @@ public class ResourceManagerTest {
}
@Test
- public void testRegisterInvalidJarResource() throws Exception {
+ public void testRegisterInvalidResource() throws Exception {
final String fileUri = file.getPath();
CommonTestUtils.assertThrows(
String.format(
- "Expect the resource type to be jar, but encounter a
resource [%s] with type %s.",
+ "Expect the resource type to be jar or artifact, but
encounter a resource [%s] with type %s.",
fileUri, ResourceType.FILE.name().toLowerCase()),
ValidationException.class,
() -> {
- resourceManager.registerJarResources(
+ resourceManager.registerResources(
Collections.singletonList(new
ResourceUri(ResourceType.FILE, fileUri)));
return null;
});
@@ -234,11 +249,11 @@ public class ResourceManagerTest {
// test register jar resource with invalid suffix
CommonTestUtils.assertThrows(
String.format(
- "The registering or unregistering jar resource [%s]
must ends with '.jar' suffix.",
+ "The registering or unregistering resource [%s] must
end with a '.jar' suffix.",
fileUri),
ValidationException.class,
() -> {
- resourceManager.registerJarResources(
+ resourceManager.registerResources(
Collections.singletonList(new
ResourceUri(ResourceType.JAR, fileUri)));
return null;
});
@@ -248,11 +263,11 @@ public class ResourceManagerTest {
CommonTestUtils.assertThrows(
String.format(
- "The registering or unregistering jar resource [%s] is
a directory that is not allowed.",
+ "The registering or unregistering resource [%s] is a
directory, which is not allowed.",
jarDir),
ValidationException.class,
() -> {
- resourceManager.registerJarResources(
+ resourceManager.registerResources(
Collections.singletonList(new
ResourceUri(ResourceType.JAR, jarDir)));
return null;
});
@@ -263,11 +278,11 @@ public class ResourceManagerTest {
CommonTestUtils.assertThrows(
String.format(
- "The registering or unregistering jar resource [%s] is
a directory that is not allowed.",
+ "The registering or unregistering resource [%s] is a
directory, which is not allowed.",
jarPath),
ValidationException.class,
() -> {
- resourceManager.registerJarResources(
+ resourceManager.registerResources(
Collections.singletonList(new
ResourceUri(ResourceType.JAR, jarPath)));
return null;
});
@@ -446,7 +461,7 @@ public class ResourceManagerTest {
void testCloseCopiedResourceManager() throws Exception {
ResourceUri resourceUri = new ResourceUri(ResourceType.JAR,
udfJar.getPath());
resourceManager.declareFunctionResources(Collections.singleton(resourceUri));
-
resourceManager.registerJarResources(Collections.singletonList(resourceUri));
+
resourceManager.registerResources(Collections.singletonList(resourceUri));
assertThat(resourceManager.functionResourceInfos().size()).isEqualTo(1);
assertThat(resourceManager.resourceInfos.size()).isEqualTo(1);
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
index 15bbdd6656e..5f81b6d1e58 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/resource/ResourceType.java
@@ -25,5 +25,6 @@ import org.apache.flink.annotation.PublicEvolving;
public enum ResourceType {
FILE,
JAR,
- ARCHIVE
+ ARCHIVE,
+ ARTIFACT,
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index 82a3a15724a..9bfa2dd7549 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -196,6 +196,9 @@ public class OperationConverterUtils {
case JAR:
resourceType = ResourceType.JAR;
break;
+ case ARTIFACT:
+ resourceType = ResourceType.ARTIFACT;
+ break;
case ARCHIVE:
resourceType = ResourceType.ARCHIVE;
break;
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 9f657eddb72..832c4f7a9c1 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -203,7 +203,7 @@ class PartitionableSourceITCase extends BatchTestBase {
tEnv
.asInstanceOf[TestingTableEnvironment]
.getResourceManager
- .registerJarResources(
+ .registerResources(
Collections.singletonList(new ResourceUri(ResourceType.JAR,
udfJarFile.toURI.toString)))
tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'")