This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4762ff0bc6 [core] support function: add engine support function (#5604)
4762ff0bc6 is described below
commit 4762ff0bc6e5c352493d17eb8c59686afd512edb
Author: jerry <[email protected]>
AuthorDate: Thu May 29 17:42:53 2025 +0800
[core] support function: add engine support function (#5604)
---
docs/content/concepts/functions.md | 120 +++++++++++++
docs/content/concepts/spec/_index.md | 2 +-
docs/content/flink/procedures.md | 61 ++++++-
docs/content/spark/procedures.md | 59 ++++++
docs/static/rest-catalog-open-api.yaml | 13 +-
.../apache/paimon/function/FunctionDefinition.java | 84 ++++++---
.../org/apache/paimon/function/FunctionImpl.java | 10 ++
.../apache/paimon/rest/auth/DLFAuthProvider.java | 5 +-
.../org/apache/paimon/utils/ParameterUtils.java | 19 ++
.../org/apache/paimon/rest/MockRESTMessage.java | 6 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 5 +-
.../apache/paimon/rest/auth/AuthProviderTest.java | 4 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 117 ++++++++++--
.../flink/procedure/AlterFunctionProcedure.java | 62 +++++++
.../flink/procedure/CreateFunctionProcedure.java | 89 ++++++++++
.../flink/procedure/DropFunctionProcedure.java | 52 ++++++
.../services/org.apache.paimon.factories.Factory | 3 +
.../org/apache/paimon/flink/RESTCatalogITCase.java | 48 +++++
.../flink/procedure/FunctionProcedureITCase.java | 64 +++++++
.../spark/JavaLambdaStringToMethodConverter.java | 107 +++++++++++
.../apache/paimon/spark/LambdaScalarFunction.java | 76 ++++++++
.../paimon/spark/PaimonSparkScalarFunction.java | 97 ++++++++++
.../java/org/apache/paimon/spark/SparkCatalog.java | 85 ++++++++-
.../org/apache/paimon/spark/SparkProcedures.java | 6 +
.../paimon/spark/catalog/SupportFunction.java | 62 -------
.../spark/procedure/AlterFunctionProcedure.java | 106 +++++++++++
.../spark/procedure/CreateFunctionProcedure.java | 137 ++++++++++++++
.../spark/procedure/DropFunctionProcedure.java | 97 ++++++++++
.../apache/paimon/spark/utils/CatalogUtils.java | 196 ++++++++++++++++++++
.../paimon/spark/utils/CatalogUtilsTest.java | 58 ++++++
.../paimon/spark/SparkCatalogWithHiveTest.java | 197 ++++++++++-----------
.../paimon/spark/SparkCatalogWithRestTest.java | 174 ++++++++++++++++--
.../spark/procedure/FunctionProcedureTest.scala | 64 +++++++
33 files changed, 2057 insertions(+), 228 deletions(-)
diff --git a/docs/content/concepts/functions.md
b/docs/content/concepts/functions.md
new file mode 100644
index 0000000000..11d05f590b
--- /dev/null
+++ b/docs/content/concepts/functions.md
@@ -0,0 +1,120 @@
+---
+title: "Functions"
+weight: 9
+type: docs
+aliases:
+- /concepts/functions.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Functions
+
+Paimon introduces a Function abstraction designed to support functions in a
standard format for compute engine, addressing:
+
+- **Unified Column-Level Filtering and Processing:** Facilitates operations at
the column level, including tasks such as encryption and decryption of data.
+
+- **Parameterized View Capabilities:** Supports parameterized operations
within views, enhancing the dynamism and usability of data retrieval processes.
+
+## Types of Functions Supported
+
+Currently, Paimon supports three types of functions:
+
+1. **File Function:** Users can define functions within a file, providing
flexibility and modular support for function definition.
+
+2. **Lambda Function:** Empowering users to define functions using Java lambda
expressions, enabling inline, concise, and functional-style operations.
+
+3. **SQL Function:** Users can define functions directly within SQL, which
integrates seamlessly with SQL-based data processing.
+
+## File Function Usage in Flink
+
+Paimon functions can be utilized within Apache Flink to execute complex data
operations. Below are the SQL commands for creating, altering, and dropping
functions in Flink environments.
+
+### Create Function
+
+To create a new function in Flink SQL:
+
+```sql
+-- Flink SQL
+CREATE FUNCTION mydb.parse_str
+ AS 'com.streaming.flink.udf.StrUdf'
+ LANGUAGE JAVA
+ USING JAR 'oss://my_bucket/my_location/udf.jar' [, JAR
'oss://my_bucket/my_location/a.jar'];
+```
+
+This statement creates a Java-based user-defined function named `parse_str`
within the `mydb` database, utilizing specified JAR files from an object
storage location.
+
+### Alter Function
+
+To modify an existing function in Flink SQL:
+
+```sql
+-- Flink SQL
+ALTER FUNCTION mydb.parse_str
+ AS 'com.streaming.flink.udf.StrUdf2'
+ LANGUAGE JAVA;
+```
+
+This command changes the implementation of the `parse_str` function to use a
new Java class definition.
+
+### Drop Function
+
+To remove a function from Flink SQL:
+
+```sql
+-- Flink SQL
+DROP FUNCTION mydb.parse_str;
+```
+
+This statement deletes the existing `parse_str` function from the `mydb`
database, relinquishing its functionality.
+
+## Lambda Function Usage in Spark
+
+### Create Function
+
+```sql
+-- Spark SQL
+CALL sys.create_function(`function` => 'my_db.area_func',
+ `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1,
"name":"width", "type":"INT"}]',
+ `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
+ `deterministic` => true,
+ `comment` => 'comment',
+ `options` => 'k1=v1,k2=v2'
+);
+```
+
+### Alter Function
+
+```sql
+-- Spark SQL
+CALL sys.alter_function(`function` => 'my_db.area_func',
+ `change` => '{"action" : "addDefinition", "name" : "spark", "definition" :
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return
(long) length * width; }", "language": "JAVA" } }'
+);
+```
+```sql
+-- Spark SQL
+select paimon.my_db.area_func(1, 2);
+```
+
+### Drop Function
+
+```sql
+-- Spark SQL
+CALL sys.drop_function(`function` => 'my_db.area_func');
+```
diff --git a/docs/content/concepts/spec/_index.md
b/docs/content/concepts/spec/_index.md
index 03bcd72234..a1a1251c74 100644
--- a/docs/content/concepts/spec/_index.md
+++ b/docs/content/concepts/spec/_index.md
@@ -1,7 +1,7 @@
---
title: Specification
bookCollapseSection: true
-weight: 9
+weight: 10
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 1fa2399c79..1793c71ff9 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -166,7 +166,7 @@ All available procedures are listed below.
CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag',
snapshot_id => cast(10 as bigint), time_retained => '1 d')
</td>
</tr>
- <tr>
+ <tr>
<td>create_tag_from_timestamp</td>
<td>
-- Create a tag from the first snapshot whose commit-time greater
than the specified timestamp. <br/>
@@ -833,5 +833,64 @@ All available procedures are listed below.
CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'drop')<br/><br/>
</td>
</tr>
+ <tr>
+ <td>create_function</td>
+ <td>
+ CALL [catalog.]sys.create_function(<br/>
+ 'function_identifier',<br/>
+ '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1,
"name":"width", "type":"INT"}]',<br/>
+ '[{"id": 0, "name":"area", "type":"BIGINT"}]',<br/>
+ true, 'comment', 'k1=v1,k2=v2')<br/>
+ </td>
+ <td>
+ To create a function. Arguments:
+ <li>function: the target function identifier. Cannot be empty.</li>
+ <li>inputParams: inputParams of the function.</li>
+ <li>returnParams: returnParams of the function.</li>
+ <li>deterministic: Whether the function is deterministic.</li>
+ <li>comment: The comment for the function.</li>
+ <li>options: the additional dynamic options of the function.</li>
+ </td>
+ <td>
+ CALL sys.create_function(`function` => 'function_identifier',<br/>
+ inputParams => '[{"id": 0, "name":"length", "type":"INT"},
{"id": 1, "name":"width", "type":"INT"}]',<br/>
+ returnParams => '[{"id": 0, "name":"area",
"type":"BIGINT"}]',<br/>
+ deterministic => true,<br/>
+ comment => 'comment',<br/>
+ options => 'k1=v1,k2=v2'<br/>
+ )<br/>
+ </td>
+ </tr>
+ <tr>
+ <td>alter_function</td>
+ <td>
+ CALL [catalog.]sys.alter_function(<br/>
+ 'function_identifier',<br/>
+ '{"action" : "addDefinition", "name" : "flink", "definition" :
{"type" : "file", "fileResources" : [{"resourceType": "JAR", "uri":
"oss://mybucket/xxxx.jar"}], "language": "JAVA", "className": "xxxx",
"functionName": "functionName" } }')<br/>
+ </td>
+ <td>
+ To alter a function. Arguments:
+ <li>function: the target function identifier. Cannot be empty.</li>
+ <li>change: change of the function.</li>
+ </td>
+ <td>
+ CALL sys.alter_function(`function` => 'function_identifier',<br/>
+ `change` => '{"action" : "addDefinition", "name" : "flink",
"definition" : {"type" : "file", "fileResources" : [{"resourceType": "JAR",
"uri": "oss://mybucket/xxxx.jar"}], "language": "JAVA", "className": "xxxx",
"functionName": "functionName" } }'<br/>
+ )<br/>
+ </td>
+ </tr>
+ <tr>
+ <td>drop_function</td>
+ <td>
+ CALL [catalog.]sys.drop_function('function_identifier')<br/>
+ </td>
+ <td>
+ To drop a function. Arguments:
+ <li>function: the target function identifier. Cannot be empty.</li>
+ </td>
+ <td>
+ CALL sys.drop_function(`function` => 'function_identifier')<br/>
+ </td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 715c0d7037..b09a6b7fdd 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -406,6 +406,65 @@ This section introduce all available spark procedures
about paimon.
CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')<br/>
CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` =>
'drop')<br/><br/>
</td>
+ </tr>
+ <tr>
+ <td>create_function</td>
+ <td>
+ CALL sys.create_function(<br/>
+ 'function_identifier',<br/>
+ '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1,
"name":"width", "type":"INT"}]',<br/>
+ '[{"id": 0, "name":"area", "type":"BIGINT"}]',<br/>
+ true, 'comment', 'k1=v1,k2=v2')<br/>
+ </td>
+ <td>
+ To create a function. Arguments:
+ <li>function: the target function identifier. Cannot be empty.</li>
+ <li>inputParams: inputParams of the function.</li>
+ <li>returnParams: returnParams of the function.</li>
+ <li>deterministic: Whether the function is deterministic.</li>
+ <li>comment: The comment for the function.</li>
+ <li>options: the additional dynamic options of the function.</li>
+ </td>
+ <td>
+ CALL sys.create_function(`function` => 'function_identifier',<br/>
+ `inputParams` => '[{"id": 0, "name":"length", "type":"INT"},
{"id": 1, "name":"width", "type":"INT"}]',<br/>
+ `returnParams` => '[{"id": 0, "name":"area",
"type":"BIGINT"}]',<br/>
+ `deterministic` => true,<br/>
+ `comment` => 'comment',<br/>
+ `options` => 'k1=v1,k2=v2'<br/>
+ )<br/>
+ </td>
+ </tr>
+ <tr>
+ <td>alter_function</td>
+ <td>
+ CALL sys.alter_function(<br/>
+ 'function_identifier',<br/>
+ '{"action" : "addDefinition", "name" : "spark", "definition" :
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return
(long) length * width; }", "language": "JAVA" } }')<br/>
+ </td>
+ <td>
+ To alter a function. Arguments:
+ <li>function: the target function identifier. Cannot be empty.</li>
+ <li>change: change of the function.</li>
+ </td>
+ <td>
+ CALL sys.alter_function(`function` => 'function_identifier',<br/>
+ `change` => '{"action" : "addDefinition", "name" : "spark",
"definition" : {"type" : "lambda", "definition" : "(Integer length, Integer
width) -> { return (long) length * width; }", "language": "JAVA" } }'<br/>
+ )<br/>
+ </td>
+ </tr>
+ <tr>
+ <td>drop_function</td>
+ <td>
+ CALL [catalog.]sys.drop_function('function_identifier')<br/>
+ </td>
+ <td>
+ To drop a function. Arguments:
+ <li>function: the target function identifier. Cannot be empty.</li>
+ </td>
+ <td>
+ CALL sys.drop_function(`function` => 'function_identifier')<br/>
+ </td>
</tr>
</tbody>
</table>
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 00878178c8..384bb6fe7a 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -1997,12 +1997,10 @@ components:
type:
type: string
const: "file"
- fileType:
- type: string
- storagePaths:
+ fileResources:
type: array
items:
- type: string
+ $ref: '#/components/schemas/FunctionFileResource'
language:
type: string
className:
@@ -2020,6 +2018,13 @@ components:
type: string
language:
type: string
+ FunctionFileResource:
+ type: object
+ properties:
+ resourceType:
+ type: string
+ uri:
+ type: string
ViewChange:
anyOf:
- $ref: '#/components/schemas/SetViewOption'
diff --git
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
index 5683797ee2..b0550ab939 100644
---
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
+++
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
@@ -46,13 +46,12 @@ import java.util.Objects;
public interface FunctionDefinition {
static FunctionDefinition file(
- String fileType,
- List<String> storagePaths,
+ List<FunctionFileResource> fileResources,
String language,
String className,
String functionName) {
return new FunctionDefinition.FileFunctionDefinition(
- fileType, storagePaths, language, className, functionName);
+ fileResources, language, className, functionName);
}
static FunctionDefinition sql(String definition) {
@@ -67,17 +66,13 @@ public interface FunctionDefinition {
@JsonIgnoreProperties(ignoreUnknown = true)
final class FileFunctionDefinition implements FunctionDefinition {
- private static final String FIELD_FILE_TYPE = "fileType";
- private static final String FIELD_STORAGE_PATHS = "storagePaths";
+ private static final String FIELD_FILE_RESOURCES = "fileResources";
private static final String FIELD_LANGUAGE = "language";
private static final String FIELD_CLASS_NAME = "className";
private static final String FIELD_FUNCTION_NAME = "functionName";
- @JsonProperty(FIELD_FILE_TYPE)
- private final String fileType;
-
- @JsonProperty(FIELD_STORAGE_PATHS)
- private final List<String> storagePaths;
+ @JsonProperty(FIELD_FILE_RESOURCES)
+ private final List<FunctionFileResource> fileResources;
@JsonProperty(FIELD_LANGUAGE)
private String language;
@@ -89,26 +84,19 @@ public interface FunctionDefinition {
private String functionName;
public FileFunctionDefinition(
- @JsonProperty(FIELD_FILE_TYPE) String fileType,
- @JsonProperty(FIELD_STORAGE_PATHS) List<String> storagePaths,
+ @JsonProperty(FIELD_FILE_RESOURCES) List<FunctionFileResource>
fileResources,
@JsonProperty(FIELD_LANGUAGE) String language,
@JsonProperty(FIELD_CLASS_NAME) String className,
@JsonProperty(FIELD_FUNCTION_NAME) String functionName) {
- this.fileType = fileType;
- this.storagePaths = storagePaths;
+ this.fileResources = fileResources;
this.language = language;
this.className = className;
this.functionName = functionName;
}
- @JsonGetter(FIELD_FILE_TYPE)
- public String fileType() {
- return fileType;
- }
-
- @JsonGetter(FIELD_STORAGE_PATHS)
- public List<String> storagePaths() {
- return storagePaths;
+ @JsonGetter(FIELD_FILE_RESOURCES)
+ public List<FunctionFileResource> fileResources() {
+ return fileResources;
}
@JsonGetter(FIELD_LANGUAGE)
@@ -135,8 +123,7 @@ public interface FunctionDefinition {
return false;
}
FileFunctionDefinition that = (FileFunctionDefinition) o;
- return fileType.equals(that.fileType)
- && Objects.equals(storagePaths, that.storagePaths)
+ return Objects.equals(fileResources, that.fileResources)
&& Objects.equals(language, that.language)
&& Objects.equals(className, that.className)
&& Objects.equals(functionName, that.functionName);
@@ -144,8 +131,8 @@ public interface FunctionDefinition {
@Override
public int hashCode() {
- int result = Objects.hash(fileType, language, className,
functionName);
- result = 31 * result + Objects.hashCode(storagePaths);
+ int result = Objects.hash(language, className, functionName);
+ result = 31 * result + Objects.hashCode(fileResources);
return result;
}
}
@@ -239,4 +226,49 @@ public interface FunctionDefinition {
private Types() {}
}
+
+ /** Function file resource. */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ class FunctionFileResource {
+
+ private static final String FIELD_RESOURCE_TYPE = "resourceType";
+ private static final String FIELD_URI = "uri";
+
+ private final String resourceType;
+ private final String uri;
+
+ public FunctionFileResource(
+ @JsonProperty(FIELD_RESOURCE_TYPE) String resourceType,
+ @JsonProperty(FIELD_URI) String uri) {
+ this.resourceType = resourceType;
+ this.uri = uri;
+ }
+
+ @JsonGetter(FIELD_RESOURCE_TYPE)
+ public String resourceType() {
+ return resourceType;
+ }
+
+ @JsonGetter(FIELD_URI)
+ public String uri() {
+ return uri;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FunctionFileResource that = (FunctionFileResource) o;
+ return resourceType.equals(that.resourceType) &&
uri.equals(that.uri);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resourceType, uri);
+ }
+ }
}
diff --git
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
index b0290ea50e..f617aa3a97 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
+++ b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
@@ -59,6 +59,16 @@ public class FunctionImpl implements Function {
this.options = options;
}
+ public FunctionImpl(Identifier identifier, Map<String, FunctionDefinition>
definitions) {
+ this.identifier = identifier;
+ this.inputParams = null;
+ this.returnParams = null;
+ this.deterministic = true;
+ this.definitions = definitions;
+ this.comment = null;
+ this.options = null;
+ }
+
@Override
public String name() {
return identifier.getObjectName();
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
b/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
index 94814d6d85..a3fe6c4950 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
@@ -20,7 +20,6 @@ package org.apache.paimon.rest.auth;
import org.apache.paimon.annotation.VisibleForTesting;
-import okhttp3.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +50,7 @@ public class DLFAuthProvider implements AuthProvider {
public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'");
- protected static final MediaType MEDIA_TYPE =
MediaType.parse("application/json");
+ protected static final String MEDIA_TYPE = "application/json";
@Nullable private final DLFTokenLoader tokenLoader;
@Nullable protected DLFToken token;
@@ -143,7 +142,7 @@ public class DLFAuthProvider implements AuthProvider {
signHeaders.put(DLF_CONTENT_SHA56_HEADER_KEY, DLF_CONTENT_SHA56_VALUE);
signHeaders.put(DLF_AUTH_VERSION_HEADER_KEY, DLFAuthSignature.VERSION);
if (data != null && !data.isEmpty()) {
- signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE.toString());
+ signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE);
signHeaders.put(DLF_CONTENT_MD5_HEADER_KEY,
DLFAuthSignature.md5(data));
}
if (securityToken != null) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
index 8284836a3b..bbb90e2f98 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
@@ -18,6 +18,11 @@
package org.apache.paimon.utils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeJsonParser;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -70,4 +75,18 @@ public class ParameterUtils {
}
mapList.put(kv[0].trim(), valueList);
}
+
+ public static List<DataField> parseDataFieldArray(String data) {
+ List<DataField> list = new ArrayList<>();
+ if (data != null) {
+ JsonNode jsonArray = JsonSerdeUtil.fromJson(data, JsonNode.class);
+ if (jsonArray.isArray()) {
+ for (JsonNode objNode : jsonArray) {
+ DataField dataField =
DataTypeJsonParser.parseDataField(objNode);
+ list.add(dataField);
+ }
+ }
+ }
+ return list;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index acb7a8ff56..fe15ba5370 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -324,7 +324,11 @@ public class MockRESTMessage {
Lists.newArrayList(new DataField(0, "area",
DataTypes.DOUBLE()));
FunctionDefinition flinkFunction =
FunctionDefinition.file(
- "jar", Lists.newArrayList("/a/b/c.jar"), "java",
"className", "eval");
+ Lists.newArrayList(
+ new
FunctionDefinition.FunctionFileResource("jar", "/a/b/c.jar")),
+ "java",
+ "className",
+ "eval");
FunctionDefinition sparkFunction =
FunctionDefinition.lambda(
"(Double length, Double width) -> length * width",
"java");
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 07e338c10d..d183184850 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -903,7 +903,10 @@ public class RESTCatalogServer {
RESTApi.fromJson(data, AlterFunctionRequest.class);
HashMap<String, FunctionDefinition> newDefinitions =
new HashMap<>(function.definitions());
- Map<String, String> newOptions = new
HashMap<>(function.options());
+ Map<String, String> newOptions =
+ function.options() != null
+ ? new HashMap<>(function.options())
+ : new HashMap<>();
String newComment = function.comment();
for (FunctionChange functionChange : requestBody.changes()) {
if (functionChange instanceof
FunctionChange.SetFunctionOption) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
index 35780998c5..cbabd69037 100644
---
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
@@ -366,9 +366,7 @@ public class AuthProviderTest {
assertTrue(header.containsKey(DLF_DATE_HEADER_KEY));
assertEquals(
DLFAuthSignature.VERSION,
header.get(DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY));
- assertEquals(
- DLFAuthProvider.MEDIA_TYPE.toString(),
- header.get(DLFAuthProvider.DLF_CONTENT_TYPE_KEY));
+ assertEquals(DLFAuthProvider.MEDIA_TYPE,
header.get(DLFAuthProvider.DLF_CONTENT_TYPE_KEY));
assertEquals(
DLFAuthSignature.md5(data),
header.get(DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY));
assertEquals(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ed4f3b8242..10f437e8bd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -29,6 +29,9 @@ import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.function.FunctionDefinition;
+import org.apache.paimon.function.FunctionImpl;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
@@ -49,11 +52,14 @@ import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
@@ -61,6 +67,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
@@ -89,6 +96,7 @@ import
org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
@@ -101,6 +109,8 @@ import
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,6 +178,7 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
public class FlinkCatalog extends AbstractCatalog {
public static final String DIALECT = "flink";
+ public static final String FUNCTION_DEFINITION_NAME = "flink";
private static final Logger LOG =
LoggerFactory.getLogger(FlinkCatalog.class);
@@ -1460,40 +1471,124 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public final List<String> listFunctions(String dbName) throws
CatalogException {
- return Collections.emptyList();
+ try {
+ return catalog.listFunctions(dbName);
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new CatalogException(e.getMessage(), e);
+ }
}
@Override
public final CatalogFunction getFunction(ObjectPath functionPath)
throws FunctionNotExistException, CatalogException {
- throw new FunctionNotExistException(getName(), functionPath);
+ try {
+ org.apache.paimon.function.Function function =
+ catalog.getFunction(toIdentifier(functionPath));
+ FunctionDefinition functionDefinition =
function.definition(FUNCTION_DEFINITION_NAME);
+ // as current only support file function, so check type
+ if (functionDefinition instanceof
FunctionDefinition.FileFunctionDefinition) {
+ FunctionDefinition.FileFunctionDefinition
fileFunctionDefinition =
+ (FunctionDefinition.FileFunctionDefinition)
functionDefinition;
+ List<ResourceUri> resourceUris =
+ fileFunctionDefinition.fileResources().stream()
+ .map(
+ resource ->
+ new ResourceUri(
+ ResourceType.valueOf(
+
resource.resourceType()),
+ resource.uri()))
+ .collect(Collectors.toList());
+ return new CatalogFunctionImpl(
+ fileFunctionDefinition.className(),
+
FunctionLanguage.valueOf(fileFunctionDefinition.language()),
+ resourceUris);
+ }
+ throw new FunctionNotExistException(getName(), functionPath);
+ } catch (Catalog.FunctionNotExistException e) {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
}
@Override
public final boolean functionExists(ObjectPath functionPath) throws
CatalogException {
- return false;
+ try {
+ return catalog.listFunctions(functionPath.getDatabaseName())
+ .contains(functionPath.getObjectName());
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new CatalogException(e.getMessage(), e);
+ }
}
@Override
public final void createFunction(
ObjectPath functionPath, CatalogFunction function, boolean
ignoreIfExists)
- throws CatalogException {
- throw new UnsupportedOperationException(
- "Create function is not supported,"
- + " maybe you can use 'CREATE TEMPORARY FUNCTION'
instead.");
+ throws FunctionAlreadyExistException, CatalogException {
+ List<FunctionDefinition.FunctionFileResource> fileResources =
+ function.getFunctionResources().stream()
+ .map(
+ r ->
+ new
FunctionDefinition.FunctionFileResource(
+ r.getResourceType().name(),
r.getUri()))
+ .collect(Collectors.toList());
+ FunctionDefinition functionDefinition =
+ FunctionDefinition.file(
+ fileResources,
+ function.getFunctionLanguage().name(),
+ function.getClassName(),
+ functionPath.getObjectName());
+ Map<String, FunctionDefinition> definitions = new HashMap<>();
+ definitions.put(FUNCTION_DEFINITION_NAME, functionDefinition);
+ org.apache.paimon.function.Function paimonFunction =
+ new FunctionImpl(toIdentifier(functionPath), definitions);
+ try {
+ catalog.createFunction(toIdentifier(functionPath), paimonFunction,
ignoreIfExists);
+ } catch (Catalog.FunctionAlreadyExistException |
Catalog.DatabaseNotExistException e) {
+ throw new FunctionAlreadyExistException(getName(), functionPath);
+ }
}
@Override
public final void alterFunction(
ObjectPath functionPath, CatalogFunction newFunction, boolean
ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
+ throws FunctionNotExistException, CatalogException {
+ try {
+ org.apache.paimon.function.Function function =
+ catalog.getFunction(toIdentifier(functionPath));
+ FunctionDefinition.FileFunctionDefinition functionDefinition =
+ (FunctionDefinition.FileFunctionDefinition)
+ function.definition(FUNCTION_DEFINITION_NAME);
+ if (functionDefinition != null) {
+ FunctionDefinition newFunctionDefinition =
+ FunctionDefinition.file(
+ functionDefinition.fileResources(),
+ newFunction.getFunctionLanguage().name(),
+ newFunction.getClassName(),
+ functionPath.getObjectName());
+ FunctionChange functionChange =
+ FunctionChange.updateDefinition(
+ FUNCTION_DEFINITION_NAME,
newFunctionDefinition);
+ catalog.alterFunction(
+ toIdentifier(functionPath),
+ ImmutableList.of(functionChange),
+ ignoreIfNotExists);
+ }
+ } catch (Catalog.FunctionNotExistException e) {
+ throw new FunctionNotExistException(getName(), functionPath);
+ } catch (Catalog.DefinitionAlreadyExistException e) {
+ throw new RuntimeException(e);
+ } catch (Catalog.DefinitionNotExistException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public final void dropFunction(ObjectPath functionPath, boolean
ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
+ throws FunctionNotExistException, CatalogException {
+ try {
+ catalog.dropFunction(toIdentifier(functionPath),
ignoreIfNotExists);
+ } catch (Catalog.FunctionNotExistException e) {
+ throw new FunctionNotExistException(getName(), functionPath);
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java
new file mode 100644
index 0000000000..bff94aa63e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * alter function procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * CALL sys.alter_function('function_identifier', '{"action" :
"addDefinition", "name" : "flink", "definition" : {"type" : "file",
"fileResources" : [{"resourceType": "JAR", "uri": "oss://mybucket/xxxx.jar"}],
"language": "JAVA", "className": "xxxx", "functionName": "functionName" } }')
+ *
+ * </code></pre>
+ */
+public class AlterFunctionProcedure extends ProcedureBase {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "function", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(name = "change", type = @DataTypeHint("STRING")),
+ })
+ public String[] call(ProcedureContext procedureContext, String function,
String change)
+ throws Catalog.FunctionNotExistException,
Catalog.DefinitionAlreadyExistException,
+ Catalog.DefinitionNotExistException {
+ Identifier identifier = Identifier.fromString(function);
+ FunctionChange functionChange = JsonSerdeUtil.fromJson(change,
FunctionChange.class);
+ catalog.alterFunction(identifier, ImmutableList.of(functionChange),
false);
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return "alter_function";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java
new file mode 100644
index 0000000000..77f2d631fa
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionImpl;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Optional;
+
+/**
+ * create function procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * CALL sys.create_function('function_identifier',
+ * '[{"id": 0, "name":"length", "type":"INT"}', '{"id": 1, "name":"width",
"type":"INT"}]',
+ * '[{"id": 0, "name":"area", "type":"BIGINT"]',
+ * true, 'comment'
+ * )
+ *
+ * </code></pre>
+ */
+public class CreateFunctionProcedure extends ProcedureBase {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "function", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(name = "inputParams", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(name = "returnParams", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "deterministic",
+ type = @DataTypeHint("BOOLEAN"),
+ isOptional = true),
+ @ArgumentHint(name = "comment", type =
@DataTypeHint("STRING"), isOptional = true),
+ @ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String function,
+ String inputParams,
+ String returnParams,
+ Boolean deterministic,
+ String comment,
+ String options)
+ throws Catalog.FunctionAlreadyExistException,
Catalog.DatabaseNotExistException {
+ Identifier identifier = Identifier.fromString(function);
+ FunctionImpl functionImpl =
+ new FunctionImpl(
+ identifier,
+ ParameterUtils.parseDataFieldArray(inputParams),
+ ParameterUtils.parseDataFieldArray(returnParams),
+ Optional.ofNullable(deterministic).orElse(true),
+ Maps.newHashMap(),
+ comment,
+ ParameterUtils.parseCommaSeparatedKeyValues(options));
+ catalog.createFunction(identifier, functionImpl, false);
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return "create_function";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java
new file mode 100644
index 0000000000..6cf957fa96
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * drop function procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * CALL sys.drop_function('function_identifier')
+ *
+ * </code></pre>
+ */
+public class DropFunctionProcedure extends ProcedureBase {
+ @ProcedureHint(argument = {@ArgumentHint(name = "function", type =
@DataTypeHint("STRING"))})
+ public String[] call(ProcedureContext procedureContext, String function)
+ throws Catalog.FunctionNotExistException {
+ Identifier identifier = Identifier.fromString(function);
+ catalog.dropFunction(identifier, false);
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return "drop_function";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index b344ea56af..df615478c5 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -92,3 +92,6 @@ org.apache.paimon.flink.procedure.ClearConsumersProcedure
org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
org.apache.paimon.flink.procedure.RescaleProcedure
org.apache.paimon.flink.procedure.AlterViewDialectProcedure
+org.apache.paimon.flink.procedure.CreateFunctionProcedure
+org.apache.paimon.flink.procedure.DropFunctionProcedure
+org.apache.paimon.flink.procedure.AlterFunctionProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 3ec704a10c..7742bbb3f3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -21,8 +21,16 @@ package org.apache.paimon.flink;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.rest.RESTToken;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
@@ -102,4 +110,44 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
assertThat(batchSql(String.format("SELECT * FROM %s.%s",
DATABASE_NAME, TABLE_NAME)))
.containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2",
22.0D));
}
+
+ @Test
+ public void testFunction() throws Exception {
+ Catalog catalog = tEnv.getCatalog("PAIMON").get();
+ String functionName = "test_str2";
+ String identifier = "com.streaming.flink.udf.StrUdf";
+ String jarResourcePath = "xxxx.jar";
+ String jarResourcePath2 = "xxxx-yyyyy.jar";
+ CatalogFunctionImpl function =
+ new CatalogFunctionImpl(
+ identifier,
+ FunctionLanguage.JAVA,
+ ImmutableList.of(
+ new ResourceUri(ResourceType.JAR,
jarResourcePath),
+ new ResourceUri(ResourceType.JAR,
jarResourcePath2)));
+ sql(
+ String.format(
+ "CREATE FUNCTION %s.%s AS '%s' LANGUAGE %s USING %s
'%s', %s '%s'",
+ DATABASE_NAME,
+ functionName,
+ function.getClassName(),
+ function.getFunctionLanguage(),
+ ResourceType.JAR,
+ jarResourcePath,
+ ResourceType.JAR,
+ jarResourcePath2));
+ assertThat(batchSql(String.format("SHOW
FUNCTIONS"))).contains(Row.of(functionName));
+ ObjectPath functionObjectPath = new ObjectPath(DATABASE_NAME,
functionName);
+ CatalogFunction getFunction = catalog.getFunction(functionObjectPath);
+ assertThat(getFunction).isEqualTo(function);
+ identifier = "com.streaming.flink.udf.StrUdf2";
+ sql(
+ String.format(
+ "ALTER FUNCTION PAIMON.%s.%s AS '%s' LANGUAGE %s",
+ DATABASE_NAME, functionName, identifier,
function.getFunctionLanguage()));
+ getFunction = catalog.getFunction(functionObjectPath);
+ assertThat(getFunction.getClassName()).isEqualTo(identifier);
+ sql(String.format("DROP FUNCTION %s.%s", DATABASE_NAME, functionName));
+ assertThat(catalog.functionExists(functionObjectPath)).isFalse();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FunctionProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FunctionProcedureITCase.java
new file mode 100644
index 0000000000..33ba3e9a3f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FunctionProcedureITCase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.RESTCatalogITCaseBase;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for function procedure. */
+public class FunctionProcedureITCase extends RESTCatalogITCaseBase {
+
+ @Test
+ public void test() throws FunctionNotExistException {
+ String functionName = "test_function";
+ List<Row> result =
+ sql(
+ String.format(
+ "CALL sys.create_function('%s.%s', "
+ + "'[{\"id\": 0, \"name\":\"length\",
\"type\":\"INT\"}, {\"id\": 1, \"name\":\"width\", \"type\":\"INT\"}]',"
+ + "'[{\"id\": 0, \"name\":\"area\",
\"type\":\"BIGINT\"}]', true, 'comment', 'k1=v1,k2=v2')",
+ DATABASE_NAME, functionName));
+ assertThat(result.toString()).contains("Success");
+ assertThat(batchSql(String.format("SHOW
FUNCTIONS"))).contains(Row.of(functionName));
+ result =
+ sql(
+ String.format(
+ "CALL sys.alter_function('%s.%s', '{\"action\"
: \"addDefinition\", \"name\" : \"flink\", \"definition\" : {\"type\" :
\"file\", \"fileResources\" : [{\"resourceType\": \"JAR\", \"uri\":
\"oss://mybucket/xxxx.jar\"}], \"language\": \"JAVA\", \"className\": \"xxxx\",
\"functionName\": \"functionName\" } }')",
+ DATABASE_NAME, functionName));
+ assertThat(result.toString()).contains("Success");
+ Catalog catalog = tEnv.getCatalog("PAIMON").get();
+ ObjectPath functionObjectPath = new ObjectPath(DATABASE_NAME,
functionName);
+ assertThat(batchSql(String.format("SHOW
FUNCTIONS"))).contains(Row.of(functionName));
+ CatalogFunction getFunction = catalog.getFunction(functionObjectPath);
+ assertThat(getFunction.getClassName()).isEqualTo("xxxx");
+ result = sql(String.format("CALL sys.drop_function('%s.%s')",
DATABASE_NAME, functionName));
+ assertThat(result.toString()).contains("Success");
+ assertThat(batchSql(String.format("SHOW
FUNCTIONS"))).doesNotContain(Row.of(functionName));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/JavaLambdaStringToMethodConverter.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/JavaLambdaStringToMethodConverter.java
new file mode 100644
index 0000000000..baf443cd5b
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/JavaLambdaStringToMethodConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Util class for compiling and loading Java lambda expressions as methods. */
+public class JavaLambdaStringToMethodConverter {
+ private static final Pattern LAMBDA_PATTERN =
+ Pattern.compile("\\s*\\(([^)]*)\\)\\s*->\\s*(.+)\\s*");
+
+ public static String getClassName(String functionName) {
+ return "PaimonLambdaFunction" + functionName;
+ }
+
+ public static String getSourceFileName(String functionName) {
+ return String.format("%s.java", getClassName(functionName));
+ }
+
+ public static Method compileAndLoadMethod(
+ String functionName, String lambdaExpression, String returnType)
throws Exception {
+ String className = getClassName(functionName);
+ String fullMethod = parseLambdaWithType(returnType, lambdaExpression,
"apply");
+
+ String javaCode = "public class " + className + " { " + fullMethod + "
}";
+
+ File sourceFile = new File(getSourceFileName(functionName));
+ try (FileWriter writer = new FileWriter(sourceFile)) {
+ writer.write(javaCode);
+ }
+
+ JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ StandardJavaFileManager fileManager =
compiler.getStandardFileManager(null, null, null);
+ Iterable<? extends JavaFileObject> compilationUnits =
+
fileManager.getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));
+ compiler.getTask(null, fileManager, null, null, null,
compilationUnits).call();
+ fileManager.close();
+ URLClassLoader classLoader =
+ URLClassLoader.newInstance(new URL[] {new
File(".").toURI().toURL()});
+ Class<?> compiledClass = Class.forName(className, true, classLoader);
+ return compiledClass.getDeclaredMethods()[0];
+ }
+
+ public static String parseLambdaWithType(
+ String outputType, String expression, String methodName) {
+ Matcher matcher = LAMBDA_PATTERN.matcher(expression);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid lambda expression: " +
expression);
+ }
+
+ String parameters = matcher.group(1).trim();
+ String body = matcher.group(2).trim();
+
+ StringBuilder method = new StringBuilder();
+ method.append("public static ")
+ .append(outputType)
+ .append(" ")
+ .append(methodName)
+ .append("(")
+ .append(parameters)
+ .append(")");
+
+ if (!body.startsWith("{")) {
+ method.append("{");
+ }
+ if (!body.contains("return ")) {
+ method.append("return ");
+ }
+ if (!body.contains(";")) {
+ method.append(body).append(";");
+ } else {
+ method.append(body);
+ }
+ if (!body.endsWith("}")) {
+ method.append("}");
+ }
+ return method.toString();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/LambdaScalarFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/LambdaScalarFunction.java
new file mode 100644
index 0000000000..5621df0cff
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/LambdaScalarFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** paimon spark java lambda scalar function. */
+public class LambdaScalarFunction implements UnboundFunction, Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private DataType outputType;
+ private String javaType;
+ private String functionName;
+ private String lambdaExpression;
+
+ public LambdaScalarFunction(
+ String functionName, DataType outputType, String javaType, String
lambdaExpression) {
+ try {
+ this.outputType = outputType;
+ this.javaType = javaType;
+ this.functionName = functionName;
+ this.lambdaExpression = lambdaExpression;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to compile lambda expression",
e);
+ }
+ }
+
+ @Override
+ public BoundFunction bind(StructType inputType) {
+ try {
+ List<DataType> inputTypes = new ArrayList<>();
+ for (StructField field : inputType.fields()) {
+ inputTypes.add(field.dataType());
+ }
+ return new PaimonSparkScalarFunction(
+ functionName, inputTypes, outputType, javaType,
this.lambdaExpression);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String description() {
+ return "";
+ }
+
+ @Override
+ public String name() {
+ return functionName;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/PaimonSparkScalarFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/PaimonSparkScalarFunction.java
new file mode 100644
index 0000000000..1a90b72569
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/PaimonSparkScalarFunction.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.spark.utils.CatalogUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Scalar function for Spark. */
+public class PaimonSparkScalarFunction implements ScalarFunction<Object>,
Serializable {
+ private static final long serialVersionUID = 1L;
+ private final String functionName;
+ private final List<DataType> inputTypes;
+ private final DataType resultType;
+ private String javaType;
+ private String lambdaExpression;
+ private transient Method compiledMethod;
+
+ public PaimonSparkScalarFunction(
+ String functionName,
+ List<DataType> inputTypes,
+ DataType resultType,
+ String javaType,
+ String lambdaExpression) {
+ this.functionName = functionName;
+ this.inputTypes = inputTypes;
+ this.resultType = resultType;
+ this.javaType = javaType;
+ this.lambdaExpression = lambdaExpression;
+ }
+
+ @Override
+ public DataType[] inputTypes() {
+ return inputTypes.toArray(new DataType[inputTypes.size()]);
+ }
+
+ @Override
+ public DataType resultType() {
+ return resultType;
+ }
+
+ @Override
+ public Object produceResult(InternalRow input) {
+ try {
+ if (this.compiledMethod == null) {
+ this.compiledMethod =
+ JavaLambdaStringToMethodConverter.compileAndLoadMethod(
+ functionName, lambdaExpression, javaType);
+ }
+ List<Object> parameters = new ArrayList<>();
+ for (int i = 0; i < inputTypes().length; i++) {
+ Object obj =
+ CatalogUtils.convertSparkJavaToPaimonJava(
+ inputTypes()[i], input.get(i,
inputTypes()[i]));
+ parameters.add(obj);
+ }
+ return this.compiledMethod.invoke(
+ null, parameters.toArray(new Object[parameters.size()]));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return functionName;
+ }
+
+ @Override
+ public String canonicalName() {
+ return functionName;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c323d86dfd..40d07d906a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -23,25 +23,33 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.function.Function;
+import org.apache.paimon.function.FunctionDefinition;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
-import org.apache.paimon.spark.catalog.SupportFunction;
import org.apache.paimon.spark.catalog.SupportView;
+import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
+import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTableOptions;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.TypeUtils;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -81,10 +89,12 @@ import static
org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
/** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog extends SparkBaseCatalog implements SupportFunction,
SupportView {
+public class SparkCatalog extends SparkBaseCatalog
+ implements SupportView, FunctionCatalog, SupportsNamespaces {
- private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
+ public static final String FUNCTION_DEFINITION_NAME = "spark";
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
protected Catalog catalog = null;
@@ -550,6 +560,75 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction, S
}
}
+ @Override
+ public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
+ if (isFunctionNamespace(namespace)) {
+ List<Identifier> functionIdentifiers = new ArrayList<>();
+ PaimonFunctions.names()
+ .forEach(name ->
functionIdentifiers.add(Identifier.of(namespace, name)));
+ if (namespace.length > 0) {
+ String databaseName = getDatabaseNameFromNamespace(namespace);
+ try {
+ catalog.listFunctions(databaseName)
+ .forEach(
+ name ->
+ functionIdentifiers.add(
+ Identifier.of(namespace,
name)));
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
+ }
+ }
+ return functionIdentifiers.toArray(new Identifier[0]);
+ }
+ throw new NoSuchNamespaceException(namespace);
+ }
+
+ @Override
+ public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
+ if (isFunctionNamespace(ident.namespace())) {
+ UnboundFunction func = PaimonFunctions.load(ident.name());
+ if (func != null) {
+ return func;
+ }
+ try {
+ Function paimonFunction =
catalog.getFunction(toIdentifier(ident));
+ FunctionDefinition functionDefinition =
+ paimonFunction.definition(FUNCTION_DEFINITION_NAME);
+ if (functionDefinition != null
+ && functionDefinition
+ instanceof
FunctionDefinition.LambdaFunctionDefinition) {
+ FunctionDefinition.LambdaFunctionDefinition
lambdaFunctionDefinition =
+ (FunctionDefinition.LambdaFunctionDefinition)
functionDefinition;
+ if (paimonFunction.returnParams().isPresent()) {
+ List<DataField> dataFields =
paimonFunction.returnParams().get();
+ if (dataFields.size() == 1) {
+ DataField dataField = dataFields.get(0);
+ return new LambdaScalarFunction(
+ ident.name(),
+
CatalogUtils.paimonType2SparkType(dataField.type()),
+
CatalogUtils.paimonType2JavaType(dataField.type()),
+ lambdaFunctionDefinition.definition());
+ } else {
+ throw new UnsupportedOperationException(
+ "outParams size > 1 is not supported");
+ }
+ }
+ }
+ } catch (Catalog.FunctionNotExistException e) {
+ throw new NoSuchFunctionException(ident);
+ }
+ }
+
+ throw new NoSuchFunctionException(ident);
+ }
+
+ private boolean isFunctionNamespace(String[] namespace) {
+ // Allow for empty namespace, as Spark's bucket join will use `bucket`
function with empty
+ // namespace to generate transforms for partitioning.
+ // Otherwise, check if it is paimon namespace.
+ return namespace.length == 0 || (namespace.length == 1 &&
namespaceExists(namespace));
+ }
+
private PropertyChange toPropertyChange(NamespaceChange change) {
if (change instanceof NamespaceChange.SetProperty) {
NamespaceChange.SetProperty set = (NamespaceChange.SetProperty)
change;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 53ae3758d5..4e2e30a0c9 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -18,15 +18,18 @@
package org.apache.paimon.spark;
+import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
+import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
import org.apache.paimon.spark.procedure.CreateTagProcedure;
import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
import org.apache.paimon.spark.procedure.DeleteTagProcedure;
+import org.apache.paimon.spark.procedure.DropFunctionProcedure;
import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
@@ -102,6 +105,9 @@ public class SparkProcedures {
procedureBuilders.put("refresh_object_table",
RefreshObjectTableProcedure::builder);
procedureBuilders.put("clear_consumers",
ClearConsumersProcedure::builder);
procedureBuilders.put("alter_view_dialect",
AlterViewDialectProcedure::builder);
+ procedureBuilders.put("create_function",
CreateFunctionProcedure::builder);
+ procedureBuilders.put("alter_function",
AlterFunctionProcedure::builder);
+ procedureBuilders.put("drop_function", DropFunctionProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
deleted file mode 100644
index d3fc3528bb..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.catalog;
-
-import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
-
-import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.connector.catalog.FunctionCatalog;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
-
-/** Catalog methods for working with Functions. */
-public interface SupportFunction extends FunctionCatalog, SupportsNamespaces {
-
- default boolean isFunctionNamespace(String[] namespace) {
- // Allow for empty namespace, as Spark's bucket join will use `bucket`
function with empty
- // namespace to generate transforms for partitioning.
- // Otherwise, check if it is paimon namespace.
- return namespace.length == 0 || (namespace.length == 1 &&
namespaceExists(namespace));
- }
-
- @Override
- default Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
- if (isFunctionNamespace(namespace)) {
- return PaimonFunctions.names().stream()
- .map(name -> Identifier.of(namespace, name))
- .toArray(Identifier[]::new);
- }
-
- throw new NoSuchNamespaceException(namespace);
- }
-
- @Override
- default UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
- if (isFunctionNamespace(ident.namespace())) {
- UnboundFunction func = PaimonFunctions.load(ident.name());
- if (func != null) {
- return func;
- }
- }
-
- throw new NoSuchFunctionException(ident);
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java
new file mode 100644
index 0000000000..daf2a1676f
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * alter function procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * CALL sys.alter_function('function_identifier', change)
+ *
+ * </code></pre>
+ */
+public class AlterFunctionProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("function", StringType),
+ ProcedureParameter.required("change", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected AlterFunctionProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
+ org.apache.spark.sql.connector.catalog.Identifier ident =
+ toIdentifier(args.getString(0), PARAMETERS[0].name());
+ Identifier function = CatalogUtils.toIdentifier(ident);
+ FunctionChange functionChange =
+ JsonSerdeUtil.fromJson(args.getString(1),
FunctionChange.class);
+ try {
+ paimonCatalog.alterFunction(function,
ImmutableList.of(functionChange), false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return new InternalRow[] {newInternalRow(true)};
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<AlterFunctionProcedure>() {
+ @Override
+ public AlterFunctionProcedure doBuild() {
+ return new AlterFunctionProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "AlterFunctionProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java
new file mode 100644
index 0000000000..a2c47739ee
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionImpl;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * create function procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * CALL sys.create_function('function_identifier',
+ * '[{"id": 0, "name":"length", "type":"INT"}', '{"id": 1, "name":"width",
"type":"INT"}]',
+ * '[{"id": 0, "name":"area", "type":"BIGINT"]',
+ * true, 'comment', 'k1=v1,k2=v2'
+ * )
+ *
+ * </code></pre>
+ */
+public class CreateFunctionProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("function", StringType),
+ ProcedureParameter.required("inputParams", StringType),
+ ProcedureParameter.required("returnParams", StringType),
+ ProcedureParameter.optional("deterministic", BooleanType),
+ ProcedureParameter.optional("comment", StringType),
+ ProcedureParameter.optional("options", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected CreateFunctionProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
+ org.apache.spark.sql.connector.catalog.Identifier ident =
+ toIdentifier(args.getString(0), PARAMETERS[0].name());
+ Identifier function = CatalogUtils.toIdentifier(ident);
+ List<DataField> inputParams = getDataFieldsFromArguments(1, args);
+ List<DataField> returnParams = getDataFieldsFromArguments(2, args);
+ boolean deterministic = args.isNullAt(3) ? true : args.getBoolean(3);
+ String comment = args.isNullAt(4) ? null : args.getString(4);
+ String properties = args.isNullAt(5) ? null : args.getString(5);
+ Map<String, String> options =
ParameterUtils.parseCommaSeparatedKeyValues(properties);
+ try {
+ FunctionImpl functionImpl =
+ new FunctionImpl(
+ function,
+ inputParams,
+ returnParams,
+ deterministic,
+ Maps.newHashMap(),
+ comment,
+ options);
+ paimonCatalog.createFunction(function, functionImpl, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return new InternalRow[] {newInternalRow(true)};
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<CreateFunctionProcedure>() {
+ @Override
+ public CreateFunctionProcedure doBuild() {
+ return new CreateFunctionProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "CreateFunctionProcedure";
+ }
+
+ public static List<DataField> getDataFieldsFromArguments(int position,
InternalRow args) {
+ String data = args.isNullAt(position) ? null :
args.getString(position);
+ return ParameterUtils.parseDataFieldArray(data);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java
new file mode 100644
index 0000000000..2a63f6eff6
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * drop function procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * CALL sys.drop_function('function_identifier')
+ *
+ * </code></pre>
+ */
+public class DropFunctionProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {ProcedureParameter.required("function",
StringType)};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected DropFunctionProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
+ org.apache.spark.sql.connector.catalog.Identifier ident =
+ toIdentifier(args.getString(0), PARAMETERS[0].name());
+ Identifier function = CatalogUtils.toIdentifier(ident);
+ try {
+ paimonCatalog.dropFunction(function, false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return new InternalRow[] {newInternalRow(true)};
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<DropFunctionProcedure>() {
+ @Override
+ public DropFunctionProcedure doBuild() {
+ return new DropFunctionProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "DropFunctionProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
index da2bcc10a2..ddda188d18 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
@@ -18,11 +18,41 @@
package org.apache.paimon.spark.utils;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.ByteType;
+import static org.apache.spark.sql.types.DataTypes.DoubleType;
+import static org.apache.spark.sql.types.DataTypes.FloatType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.ShortType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
/** Utils of catalog. */
public class CatalogUtils {
@@ -52,4 +82,170 @@ public class CatalogUtils {
return ident;
}
}
+
+ public static org.apache.spark.sql.types.DataType paimonType2SparkType(
+ org.apache.paimon.types.DataType type) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return StringType;
+ case BOOLEAN:
+ return BooleanType;
+ case BINARY:
+ case VARBINARY:
+ return BinaryType;
+ case TINYINT:
+ return ByteType;
+ case SMALLINT:
+ return ShortType;
+ case INTEGER:
+ return IntegerType;
+ case BIGINT:
+ return LongType;
+ case FLOAT:
+ return FloatType;
+ case DOUBLE:
+ return DoubleType;
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) type;
+ return DataTypes.createDecimalType(
+ decimalType.getPrecision(), decimalType.getScale());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return TimestampType;
+ case ARRAY:
+ return createArrayType(paimonType2SparkType(((ArrayType)
type).getElementType()));
+ case MAP:
+ case MULTISET:
+ DataType keyType;
+ DataType valueType;
+ if (type instanceof MapType) {
+ keyType = ((MapType) type).getKeyType();
+ valueType = ((MapType) type).getValueType();
+ } else if (type instanceof MultisetType) {
+ keyType = ((MultisetType) type).getElementType();
+ valueType = new IntType();
+ } else {
+ throw new UnsupportedOperationException("Unsupported type:
" + type);
+ }
+ return DataTypes.createMapType(
+ paimonType2SparkType(keyType),
paimonType2SparkType(valueType));
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
type);
+ }
+ }
+
+ public static String paimonType2JavaType(org.apache.paimon.types.DataType
type) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return "java.lang.String";
+ case BOOLEAN:
+ return "java.lang.Boolean";
+ case BINARY:
+ case VARBINARY:
+ return "byte[]";
+ case TINYINT:
+ return "java.lang.Byte";
+ case SMALLINT:
+ return "java.lang.Short";
+ case INTEGER:
+ return "java.lang.Integer";
+ case BIGINT:
+ return "java.lang.Long";
+ case FLOAT:
+ return "java.lang.Float";
+ case DOUBLE:
+ return "java.lang.Double";
+ case DECIMAL:
+ return "java.math.BigDecimal";
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return "java.time.Instant";
+ case ARRAY:
+ return paimonType2JavaType(((ArrayType)
type).getElementType()) + "[]";
+ case MAP:
+ case MULTISET:
+ DataType keyType;
+ DataType valueType;
+ if (type instanceof MapType) {
+ keyType = ((MapType) type).getKeyType();
+ valueType = ((MapType) type).getValueType();
+ } else if (type instanceof MultisetType) {
+ keyType = ((MultisetType) type).getElementType();
+ valueType = new IntType();
+ } else {
+ throw new UnsupportedOperationException("Unsupported type:
" + type);
+ }
+ return String.format(
+ "java.util.Map<%s,%s>",
+ paimonType2JavaType(keyType),
paimonType2JavaType(valueType));
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
type);
+ }
+ }
+
+ public static Object convertSparkJavaToPaimonJava(
+ org.apache.spark.sql.types.DataType sparkType, Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (sparkType == StringType) {
+ return (String) value.toString();
+ } else if (sparkType == BooleanType) {
+ return (Boolean) value;
+ } else if (sparkType == BinaryType) {
+ return (byte[]) value;
+ } else if (sparkType == ByteType) {
+ return (byte) value;
+ } else if (sparkType == ShortType) {
+ return (short) value;
+ } else if (sparkType == IntegerType) {
+ return (Integer) value;
+ } else if (sparkType == LongType) {
+ return (Long) value;
+ } else if (sparkType == FloatType) {
+ return (Float) value;
+ } else if (sparkType == DoubleType) {
+ return (Double) value;
+ } else if (sparkType instanceof
org.apache.spark.sql.types.DecimalType) {
+ return (java.math.BigDecimal) value;
+ } else if (sparkType instanceof org.apache.spark.sql.types.ArrayType) {
+ org.apache.spark.sql.types.ArrayType arrayType =
+ (org.apache.spark.sql.types.ArrayType) sparkType;
+ List<Object> list = new ArrayList<>();
+ if (value instanceof GenericArrayData) {
+ GenericArrayData genericArray = (GenericArrayData) value;
+ Object[] array = genericArray.array();
+ for (Object elem : array) {
+
list.add(convertSparkJavaToPaimonJava(arrayType.elementType(), elem));
+ }
+ return list;
+ } else {
+ throw new IllegalArgumentException("Unexpected array type: " +
value.getClass());
+ }
+ } else if (sparkType instanceof org.apache.spark.sql.types.MapType) {
+ org.apache.spark.sql.types.MapType mapType =
+ (org.apache.spark.sql.types.MapType) sparkType;
+ if (value instanceof ArrayBasedMapData) {
+ ArrayBasedMapData arrayBasedMapData = (ArrayBasedMapData)
value;
+ Map<Object, Object> sparkMap =
+ ArrayBasedMapData.toJavaMap(
+ arrayBasedMapData.keyArray().array(),
+ arrayBasedMapData.valueArray().array());
+ Map<Object, Object> javaMap = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : sparkMap.entrySet()) {
+ Object key =
convertSparkJavaToPaimonJava(mapType.keyType(), entry.getKey());
+ Object val =
+ convertSparkJavaToPaimonJava(mapType.valueType(),
entry.getValue());
+ javaMap.put(key, val);
+ }
+ return javaMap;
+ } else {
+ throw new IllegalArgumentException("Unexpected array type: " +
value.getClass());
+ }
+ }
+
+ throw new IllegalArgumentException("Unsupported Spark data type: " +
sparkType);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/utils/CatalogUtilsTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/utils/CatalogUtilsTest.java
new file mode 100644
index 0000000000..103b1783cd
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/utils/CatalogUtilsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.utils;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
+import static org.apache.spark.sql.types.DataTypes.createMapType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CatalogUtils}. */
+public class CatalogUtilsTest {
+ @Test
+ void paimonType2SparkType() {
+ assertThat(CatalogUtils.paimonType2SparkType(new DecimalType(2, 1)))
+ .isEqualTo(DataTypes.createDecimalType(2, 1));
+ assertThat(CatalogUtils.paimonType2SparkType(new ArrayType(new
IntType())))
+
.isEqualTo(createArrayType(org.apache.spark.sql.types.DataTypes.IntegerType));
+ assertThat(CatalogUtils.paimonType2SparkType(new MapType(new
IntType(), new VarCharType())))
+ .isEqualTo(
+ createMapType(
+
org.apache.spark.sql.types.DataTypes.IntegerType,
+ DataTypes.StringType));
+ }
+
+ @Test
+ void paimonType2JavaType() {
+ assertThat(CatalogUtils.paimonType2JavaType(new DecimalType(2, 1)))
+ .isEqualTo("java.math.BigDecimal");
+ assertThat(CatalogUtils.paimonType2JavaType(new ArrayType(new
IntType())))
+ .isEqualTo("java.lang.Integer[]");
+ assertThat(CatalogUtils.paimonType2JavaType(new MapType(new IntType(),
new VarCharType())))
+
.isEqualTo("java.util.Map<java.lang.Integer,java.lang.String>");
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 444fb1024d..488913f14a 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -41,8 +41,8 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
public class SparkCatalogWithHiveTest {
private static TestHiveMetastore testHiveMetastore;
-
private static final int PORT = 9087;
+ @TempDir java.nio.file.Path tempDir;
@BeforeAll
public static void startMetastore() {
@@ -55,7 +55,102 @@ public class SparkCatalogWithHiveTest {
testHiveMetastore.stop();
}
- @TempDir java.nio.file.Path tempDir;
+ @Test
+ public void testCreateFormatTable() {
+ try (SparkSession spark = createSessionBuilder().getOrCreate()) {
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ // test orc table
+
+ spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c
STRING) USING orc");
+
+ assertThat(
+ spark.sql("SHOW TABLES").collectAsList().stream()
+ .map(s -> s.get(1))
+ .map(Object::toString))
+ .containsExactlyInAnyOrder("table_orc");
+
+ assertThat(
+ spark.sql("EXPLAIN EXTENDED SELECT * from
table_orc").collectAsList()
+ .stream()
+ .map(s -> s.get(0))
+ .map(Object::toString)
+ .filter(s -> s.contains("OrcScan"))
+ .count())
+ .isGreaterThan(0);
+
+ // todo: There are some bugs with Spark CSV table's options. In
Spark 3.x, both reading
+ // and
+ // writing using the default delimiter value ',' even if we
specific it. In Spark 4.x,
+ // reading is correct, but writing is still incorrect, just skip
setting it for now.
+ // test csv table
+
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c
STRING) USING csv OPTIONS ('field-delimiter' ',')");
+ spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2,
'2')").collect();
+ assertThat(spark.sql("DESCRIBE FORMATTED
table_csv").collectAsList().toString())
+ .contains("sep=,");
+ assertThat(
+ spark.sql("SELECT * FROM
table_csv").collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
+
+ // test json table
+
+ spark.sql("CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c
STRING) USING json");
+ spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
+ assertThat(
+ spark.sql("SELECT * FROM
table_json").collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
+ }
+ }
+
+ @Test
+ public void testSpecifyHiveConfDirInGenericCatalog() {
+ try (SparkSession spark =
+ createSessionBuilder()
+
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ SparkGenericCatalog.class.getName())
+ .getOrCreate()) {
+ assertThatThrownBy(() -> spark.sql("CREATE DATABASE my_db"))
+ .rootCause()
+ .isInstanceOf(FileNotFoundException.class)
+ .hasMessageContaining("nonExistentPath");
+ }
+ }
+
+ @Test
+ public void testCreateExternalTable() {
+ try (SparkSession spark = createSessionBuilder().getOrCreate()) {
+ String warehousePath =
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
+ spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
+ spark.sql("USE spark_catalog.test_db");
+
+ // create hive external table
+ spark.sql("CREATE EXTERNAL TABLE external_table (a INT, bb INT, c
STRING)");
+
+ // drop hive external table
+ spark.sql("DROP TABLE external_table");
+
+ // file system table exists
+ assertThatCode(
+ () ->
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(
+ warehousePath,
+ String.format(
+ "%s.db/%s",
+ "test_db",
"external_table"))))
+ .doesNotThrowAnyException();
+ }
+ }
private SparkSession.Builder createSessionBuilder() {
Path warehousePath = new Path("file:" + tempDir.toString());
@@ -75,102 +170,4 @@ public class SparkCatalogWithHiveTest {
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
.master("local[2]");
}
-
- @Test
- public void testCreateFormatTable() {
- SparkSession spark = createSessionBuilder().getOrCreate();
- spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
- spark.sql("USE spark_catalog.my_db1");
-
- // test orc table
-
- spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c
STRING) USING orc");
-
- assertThat(
- spark.sql("SHOW TABLES").collectAsList().stream()
- .map(s -> s.get(1))
- .map(Object::toString))
- .containsExactlyInAnyOrder("table_orc");
-
- assertThat(
- spark.sql("EXPLAIN EXTENDED SELECT * from
table_orc").collectAsList()
- .stream()
- .map(s -> s.get(0))
- .map(Object::toString)
- .filter(s -> s.contains("OrcScan"))
- .count())
- .isGreaterThan(0);
-
- // todo: There are some bugs with Spark CSV table's options. In Spark
3.x, both reading and
- // writing using the default delimiter value ',' even if we specific
it. In Spark 4.x,
- // reading is correct, but writing is still incorrect, just skip
setting it for now.
- // test csv table
-
- spark.sql(
- "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c
STRING) USING csv OPTIONS ('field-delimiter' ',')");
- spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2,
'2')").collect();
- assertThat(spark.sql("DESCRIBE FORMATTED
table_csv").collectAsList().toString())
- .contains("sep=,");
- assertThat(
- spark.sql("SELECT * FROM
table_csv").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
- // test json table
-
- spark.sql("CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c
STRING) USING json");
- spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
- assertThat(
- spark.sql("SELECT * FROM
table_json").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
- spark.close();
- }
-
- @Test
- public void testSpecifyHiveConfDirInGenericCatalog() {
- SparkSession spark =
- createSessionBuilder()
-
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
- .config(
- "spark.sql.catalog.spark_catalog",
- SparkGenericCatalog.class.getName())
- .getOrCreate();
- assertThatThrownBy(() -> spark.sql("CREATE DATABASE my_db"))
- .rootCause()
- .isInstanceOf(FileNotFoundException.class)
- .hasMessageContaining("nonExistentPath");
-
- spark.close();
- }
-
- @Test
- public void testCreateExternalTable() {
- SparkSession spark = createSessionBuilder().getOrCreate();
- String warehousePath =
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
- spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
- spark.sql("USE spark_catalog.test_db");
-
- // create hive external table
- spark.sql("CREATE EXTERNAL TABLE external_table (a INT, bb INT, c
STRING)");
-
- // drop hive external table
- spark.sql("DROP TABLE external_table");
-
- // file system table exists
- assertThatCode(
- () ->
- FileStoreTableFactory.create(
- LocalFileIO.create(),
- new Path(
- warehousePath,
- String.format(
- "%s.db/%s", "test_db",
"external_table"))))
- .doesNotThrowAnyException();
-
- spark.close();
- }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index 4502ba0122..575eb72711 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -18,6 +18,12 @@
package org.apache.paimon.spark;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.Function;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.function.FunctionDefinition;
+import org.apache.paimon.function.FunctionImpl;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.rest.RESTCatalogInternalOptions;
import org.apache.paimon.rest.RESTCatalogServer;
@@ -25,16 +31,24 @@ import org.apache.paimon.rest.auth.AuthProvider;
import org.apache.paimon.rest.auth.AuthProviderEnum;
import org.apache.paimon.rest.auth.BearTokenAuthProvider;
import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -48,6 +62,7 @@ public class SparkCatalogWithRestTest {
private String warehouse;
@TempDir java.nio.file.Path tempFile;
private String initToken = "init_token";
+ private SparkSession spark;
@BeforeEach
public void before() throws IOException {
@@ -65,16 +80,7 @@ public class SparkCatalogWithRestTest {
restCatalogServer = new RESTCatalogServer(dataPath, authProvider,
config, warehouse);
restCatalogServer.start();
serverUrl = restCatalogServer.getUrl();
- }
-
- @AfterEach()
- public void after() throws Exception {
- restCatalogServer.shutdown();
- }
-
- @Test
- public void testTable() {
- SparkSession spark =
+ spark =
SparkSession.builder()
.config("spark.sql.catalog.paimon",
SparkCatalog.class.getName())
.config("spark.sql.catalog.paimon.metastore", "rest")
@@ -89,9 +95,18 @@ public class SparkCatalogWithRestTest {
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
.master("local[2]")
.getOrCreate();
-
spark.sql("CREATE DATABASE paimon.db2");
spark.sql("USE paimon.db2");
+ }
+
+ @AfterEach()
+ public void after() throws Exception {
+ restCatalogServer.shutdown();
+ spark.close();
+ }
+
+ @Test
+ public void testTable() {
spark.sql(
"CREATE TABLE t1 (a INT, b INT, c STRING) TBLPROPERTIES"
+ " ('primary-key'='a', 'bucket'='4',
'file.format'='avro')");
@@ -102,6 +117,141 @@ public class SparkCatalogWithRestTest {
.containsExactlyInAnyOrder("t1");
spark.sql("DROP TABLE t1");
assertThat(spark.sql("SHOW TABLES").collectAsList().size() == 0);
- spark.close();
+ }
+
+ @Test
+ public void testFunction() throws Exception {
+ List<DataField> inputParams = new ArrayList<>();
+ Catalog paimonCatalog = getPaimonCatalog();
+ inputParams.add(new DataField(0, "length", DataTypes.INT()));
+ inputParams.add(new DataField(1, "width", DataTypes.INT()));
+ List<DataField> returnParams = new ArrayList<>();
+ returnParams.add(new DataField(0, "area", DataTypes.BIGINT()));
+ String functionName = "area_func";
+ FunctionDefinition definition =
+ FunctionDefinition.lambda(
+ "(Integer length, Integer width) -> { return (long)
length * width; }",
+ "JAVA");
+ Identifier identifier = Identifier.create("db2", functionName);
+ Function function =
+ new FunctionImpl(
+ identifier,
+ inputParams,
+ returnParams,
+ false,
+ ImmutableMap.of(SparkCatalog.FUNCTION_DEFINITION_NAME,
definition),
+ null,
+ null);
+ paimonCatalog.createFunction(identifier, function, false);
+ assertThat(
+ spark.sql(String.format("select paimon.db2.%s(1, 2)",
functionName))
+ .collectAsList()
+ .get(0)
+ .toString())
+ .isEqualTo("[2]");
+ definition =
+ FunctionDefinition.lambda(
+ "(Integer length, Integer width) -> { return length *
width + 1L; }",
+ "JAVA");
+ paimonCatalog.alterFunction(
+ identifier,
+ ImmutableList.of(
+ FunctionChange.updateDefinition(
+ SparkCatalog.FUNCTION_DEFINITION_NAME,
definition)),
+ false);
+ assertThat(
+ spark.sql(String.format("select paimon.db2.%s(1, 2)",
functionName))
+ .collectAsList()
+ .get(0)
+ .toString())
+ .isEqualTo("[3]");
+ paimonCatalog.dropFunction(identifier, false);
+ cleanFunction(functionName);
+ }
+
+ @Test
+ public void testArrayFunction() throws Exception {
+ List<DataField> inputParams = new ArrayList<>();
+ Catalog paimonCatalog = getPaimonCatalog();
+ inputParams.add(new DataField(0, "x",
DataTypes.ARRAY(DataTypes.INT())));
+ List<DataField> returnParams = new ArrayList<>();
+ returnParams.add(new DataField(0, "y", DataTypes.INT()));
+ String functionName = "test";
+ Identifier identifier = Identifier.create("db2", functionName);
+ FunctionDefinition definition =
+ FunctionDefinition.lambda(
+ "(java.util.List<java.util.List<Integer>> x) ->
x.size()", "JAVA");
+ Function function =
+ new FunctionImpl(
+ identifier,
+ inputParams,
+ returnParams,
+ false,
+ ImmutableMap.of(SparkCatalog.FUNCTION_DEFINITION_NAME,
definition),
+ null,
+ null);
+ paimonCatalog.createFunction(identifier, function, false);
+ assertThat(
+ spark.sql(
+ String.format(
+ "select
paimon.db2.%s(array(array(1, 2, 3), array(1, 2, 3)))",
+ functionName))
+ .collectAsList()
+ .get(0)
+ .toString())
+ .isEqualTo("[2]");
+ cleanFunction(functionName);
+ }
+
+ @Test
+ public void testMapFunction() throws Exception {
+ List<DataField> inputParams = new ArrayList<>();
+ Catalog paimonCatalog = getPaimonCatalog();
+ inputParams.add(new DataField(0, "x", DataTypes.MAP(DataTypes.INT(),
DataTypes.INT())));
+ List<DataField> returnParams = new ArrayList<>();
+ returnParams.add(new DataField(0, "y", DataTypes.INT()));
+ String functionName = "test";
+ Identifier identifier = Identifier.create("db2", functionName);
+ FunctionDefinition definition =
+ FunctionDefinition.lambda(
+ "(java.util.Map<Integer, Integer> x) -> x.size()",
"JAVA");
+ Function function =
+ new FunctionImpl(
+ identifier,
+ inputParams,
+ returnParams,
+ false,
+ ImmutableMap.of(SparkCatalog.FUNCTION_DEFINITION_NAME,
definition),
+ null,
+ null);
+ paimonCatalog.createFunction(identifier, function, false);
+ assertThat(
+ spark.sql(String.format("select paimon.db2.%s(map(1,
2))", functionName))
+ .collectAsList()
+ .get(0)
+ .toString())
+ .isEqualTo("[1]");
+ cleanFunction(functionName);
+ }
+
+ private Catalog getPaimonCatalog() {
+ CatalogManager catalogManager = spark.sessionState().catalogManager();
+ WithPaimonCatalog withPaimonCatalog = (WithPaimonCatalog)
catalogManager.currentCatalog();
+ return withPaimonCatalog.paimonCatalog();
+ }
+
+ private void cleanFunction(String functionName) {
+ String functionFilePath =
+ SparkCatalogWithRestTest.class
+ .getProtectionDomain()
+ .getCodeSource()
+ .getLocation()
+ .getPath()
+ .toString()
+ .replaceAll(
+ "target/test-classes/",
+
JavaLambdaStringToMethodConverter.getSourceFileName(functionName));
+ File file = new File(functionFilePath);
+ file.delete();
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
new file mode 100644
index 0000000000..b52022b89a
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+/** Test for Function Procedure. */
+class FunctionProcedureTest extends PaimonRestCatalogSparkTestBase {
+ test(s"test function procedure") {
+ val functionName = "function_test"
+ checkAnswer(
+ spark.sql(s"CALL sys.create_function('$functionName', " +
+ "'[{\"id\": 0, \"name\":\"length\", \"type\":\"INT\"}, {\"id\": 1,
\"name\":\"width\", \"type\":\"INT\"}]'," +
+ " '[{\"id\": 0, \"name\":\"area\", \"type\":\"BIGINT\"}]', true,
'comment', 'k1=v1,k2=v2')"),
+ Row(true)
+ );
+ assertThat(
+ spark
+ .sql("SHOW FUNCTIONS")
+ .collect()
+ .map(r => r.getString(0))
+ .filter(v => v == s"paimon.test.$functionName")
+ .size).isEqualTo(1)
+ checkAnswer(
+ spark.sql(s"CALL sys.alter_function('$functionName', " +
+ "'{\"action\" : \"addDefinition\", \"name\" : \"spark\",
\"definition\" : {\"type\" : \"lambda\", \"definition\" : \"(Integer length,
Integer width) -> { return (long) length * width; }\", \"language\": \"JAVA\" }
}')"),
+ Row(true)
+ );
+ checkAnswer(
+ spark.sql(s"select paimon.test.$functionName(1, 2)"),
+ Row(2)
+ );
+ checkAnswer(
+ spark.sql(s"CALL sys.drop_function('$functionName')"),
+ Row(true)
+ );
+ assertThat(
+ spark
+ .sql("SHOW FUNCTIONS")
+ .collect()
+ .map(r => r.getString(0))
+ .filter(v => v == s"paimon.test.$functionName")
+ .size).isEqualTo(0);
+ }
+}