This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4762ff0bc6 [core] support function: add engine support function (#5604)
4762ff0bc6 is described below

commit 4762ff0bc6e5c352493d17eb8c59686afd512edb
Author: jerry <[email protected]>
AuthorDate: Thu May 29 17:42:53 2025 +0800

    [core] support function: add engine support function (#5604)
---
 docs/content/concepts/functions.md                 | 120 +++++++++++++
 docs/content/concepts/spec/_index.md               |   2 +-
 docs/content/flink/procedures.md                   |  61 ++++++-
 docs/content/spark/procedures.md                   |  59 ++++++
 docs/static/rest-catalog-open-api.yaml             |  13 +-
 .../apache/paimon/function/FunctionDefinition.java |  84 ++++++---
 .../org/apache/paimon/function/FunctionImpl.java   |  10 ++
 .../apache/paimon/rest/auth/DLFAuthProvider.java   |   5 +-
 .../org/apache/paimon/utils/ParameterUtils.java    |  19 ++
 .../org/apache/paimon/rest/MockRESTMessage.java    |   6 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |   5 +-
 .../apache/paimon/rest/auth/AuthProviderTest.java  |   4 +-
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 117 ++++++++++--
 .../flink/procedure/AlterFunctionProcedure.java    |  62 +++++++
 .../flink/procedure/CreateFunctionProcedure.java   |  89 ++++++++++
 .../flink/procedure/DropFunctionProcedure.java     |  52 ++++++
 .../services/org.apache.paimon.factories.Factory   |   3 +
 .../org/apache/paimon/flink/RESTCatalogITCase.java |  48 +++++
 .../flink/procedure/FunctionProcedureITCase.java   |  64 +++++++
 .../spark/JavaLambdaStringToMethodConverter.java   | 107 +++++++++++
 .../apache/paimon/spark/LambdaScalarFunction.java  |  76 ++++++++
 .../paimon/spark/PaimonSparkScalarFunction.java    |  97 ++++++++++
 .../java/org/apache/paimon/spark/SparkCatalog.java |  85 ++++++++-
 .../org/apache/paimon/spark/SparkProcedures.java   |   6 +
 .../paimon/spark/catalog/SupportFunction.java      |  62 -------
 .../spark/procedure/AlterFunctionProcedure.java    | 106 +++++++++++
 .../spark/procedure/CreateFunctionProcedure.java   | 137 ++++++++++++++
 .../spark/procedure/DropFunctionProcedure.java     |  97 ++++++++++
 .../apache/paimon/spark/utils/CatalogUtils.java    | 196 ++++++++++++++++++++
 .../paimon/spark/utils/CatalogUtilsTest.java       |  58 ++++++
 .../paimon/spark/SparkCatalogWithHiveTest.java     | 197 ++++++++++-----------
 .../paimon/spark/SparkCatalogWithRestTest.java     | 174 ++++++++++++++++--
 .../spark/procedure/FunctionProcedureTest.scala    |  64 +++++++
 33 files changed, 2057 insertions(+), 228 deletions(-)

diff --git a/docs/content/concepts/functions.md 
b/docs/content/concepts/functions.md
new file mode 100644
index 0000000000..11d05f590b
--- /dev/null
+++ b/docs/content/concepts/functions.md
@@ -0,0 +1,120 @@
+---
+title: "Functions"
+weight: 9
+type: docs
+aliases:
+- /concepts/functions.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Functions
+
+Paimon introduces a Function abstraction designed to support functions in a 
standard format for compute engine, addressing:
+
+- **Unified Column-Level Filtering and Processing:** Facilitates operations at 
the column level, including tasks such as encryption and decryption of data.
+
+- **Parameterized View Capabilities:** Supports parameterized operations 
within views, enhancing the dynamism and usability of data retrieval processes.
+
+## Types of Functions Supported
+
+Currently, Paimon supports three types of functions:
+
+1. **File Function:** Users can define functions within a file, providing 
flexibility and modular support for function definition.
+
+2. **Lambda Function:** Empowering users to define functions using Java lambda 
expressions, enabling inline, concise, and functional-style operations.
+
+3. **SQL Function:** Users can define functions directly within SQL, which 
integrates seamlessly with SQL-based data processing.
+
+## File Function Usage in Flink
+
+Paimon functions can be utilized within Apache Flink to execute complex data 
operations. Below are the SQL commands for creating, altering, and dropping 
functions in Flink environments.
+
+### Create Function
+
+To create a new function in Flink SQL:
+
+```sql
+-- Flink SQL
+CREATE FUNCTION mydb.parse_str
+    AS 'com.streaming.flink.udf.StrUdf' 
+    LANGUAGE JAVA
+    USING JAR 'oss://my_bucket/my_location/udf.jar' [, JAR 
'oss://my_bucket/my_location/a.jar'];
+```
+
+This statement creates a Java-based user-defined function named `parse_str` 
within the `mydb` database, utilizing specified JAR files from an object 
storage location.
+
+### Alter Function
+
+To modify an existing function in Flink SQL:
+
+```sql
+-- Flink SQL
+ALTER FUNCTION mydb.parse_str
+    AS 'com.streaming.flink.udf.StrUdf2' 
+    LANGUAGE JAVA;
+```
+
+This command changes the implementation of the `parse_str` function to use a 
new Java class definition.
+
+### Drop Function
+
+To remove a function from Flink SQL:
+
+```sql
+-- Flink SQL
+DROP FUNCTION mydb.parse_str;
+```
+
+This statement deletes the existing `parse_str` function from the `mydb` 
database, relinquishing its functionality.
+
+## Lambda Function Usage in Spark
+
+### Create Function
+
+```sql
+-- Spark SQL
+CALL sys.create_function(`function` => 'my_db.area_func',
+  `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, 
"name":"width", "type":"INT"}]',
+  `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
+  `deterministic` => true,
+  `comment` => 'comment',
+  `options` => 'k1=v1,k2=v2'
+);
+```
+
+### Alter Function
+
+```sql
+-- Spark SQL
+CALL sys.alter_function(`function` => 'my_db.area_func',
+  `change` => '{"action" : "addDefinition", "name" : "spark", "definition" : 
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return 
(long) length * width; }", "language": "JAVA" } }'
+);
+```
+```sql
+-- Spark SQL
+select paimon.my_db.area_func(1, 2);
+```
+
+### Drop Function
+
+```sql
+-- Spark SQL
+CALL sys.drop_function(`function` => 'my_db.area_func');
+```
diff --git a/docs/content/concepts/spec/_index.md 
b/docs/content/concepts/spec/_index.md
index 03bcd72234..a1a1251c74 100644
--- a/docs/content/concepts/spec/_index.md
+++ b/docs/content/concepts/spec/_index.md
@@ -1,7 +1,7 @@
 ---
 title: Specification
 bookCollapseSection: true
-weight: 9
+weight: 10
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 1fa2399c79..1793c71ff9 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -166,7 +166,7 @@ All available procedures are listed below.
          CALL sys.create_tag(`table` => 'default.T', tag => 'my_tag', 
snapshot_id => cast(10 as bigint), time_retained => '1 d')
       </td>
    </tr>
-    <tr>
+   <tr>
       <td>create_tag_from_timestamp</td>
       <td>
          -- Create a tag from the first snapshot whose commit-time greater 
than the specified timestamp. <br/>
@@ -833,5 +833,64 @@ All available procedures are listed below.
          CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'drop')<br/><br/>
       </td>
    </tr>
+   <tr>
+      <td>create_function</td>
+      <td>
+         CALL [catalog.]sys.create_function(<br/>
+                'function_identifier',<br/>
+                '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, 
"name":"width", "type":"INT"}]',<br/>
+                '[{"id": 0, "name":"area", "type":"BIGINT"}]',<br/>
+                true, 'comment', 'k1=v1,k2=v2')<br/>
+      </td>
+      <td>
+         To create a function. Arguments:
+            <li>function: the target function identifier. Cannot be empty.</li>
+            <li>inputParams: inputParams of the function.</li>
+            <li>returnParams: returnParams of the function.</li>
+            <li>deterministic: Whether the function is deterministic.</li>
+            <li>comment: The comment for the function.</li>
+            <li>options: the additional dynamic options of the function.</li>
+      </td>
+      <td>
+         CALL sys.create_function(`function` => 'function_identifier',<br/>
+              inputParams => '[{"id": 0, "name":"length", "type":"INT"}, 
{"id": 1, "name":"width", "type":"INT"}]',<br/>
+              returnParams => '[{"id": 0, "name":"area", 
"type":"BIGINT"}]',<br/>
+              deterministic => true,<br/>
+              comment => 'comment',<br/>
+              options => 'k1=v1,k2=v2'<br/>
+         )<br/>
+      </td>
+   </tr>
+   <tr>
+      <td>alter_function</td>
+      <td>
+         CALL [catalog.]sys.alter_function(<br/>
+                'function_identifier',<br/>
+                '{"action" : "addDefinition", "name" : "flink", "definition" : 
{"type" : "file", "fileResources" : [{"resourceType": "JAR", "uri": 
"oss://mybucket/xxxx.jar"}], "language": "JAVA", "className": "xxxx", 
"functionName": "functionName" } }')<br/>
+      </td>
+      <td>
+         To alter a function. Arguments:
+            <li>function: the target function identifier. Cannot be empty.</li>
+            <li>change: change of the function.</li>
+      </td>
+      <td>
+         CALL sys.alter_function(`function` => 'function_identifier',<br/>
+              `change` => '{"action" : "addDefinition", "name" : "flink", 
"definition" : {"type" : "file", "fileResources" : [{"resourceType": "JAR", 
"uri": "oss://mybucket/xxxx.jar"}], "language": "JAVA", "className": "xxxx", 
"functionName": "functionName" } }'<br/>
+         )<br/>
+      </td>
+   </tr>
+   <tr>
+      <td>drop_function</td>
+      <td>
+         CALL [catalog.]sys.drop_function('function_identifier')<br/>
+      </td>
+      <td>
+         To drop a function. Arguments:
+            <li>function: the target function identifier. Cannot be empty.</li>
+      </td>
+      <td>
+         CALL sys.drop_function(`function` => 'function_identifier')<br/>
+      </td>
+   </tr>
    </tbody>
 </table>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 715c0d7037..b09a6b7fdd 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -406,6 +406,65 @@ This section introduce all available spark procedures 
about paimon.
          CALL sys.alter_view_dialect('view_identifier', 'drop', 'spark')<br/>
          CALL sys.alter_view_dialect(`view` => 'view_identifier', `action` => 
'drop')<br/><br/>
       </td>
+   </tr>
+      <tr>
+      <td>create_function</td>
+      <td>
+         CALL sys.create_function(<br/>
+                'function_identifier',<br/>
+                '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, 
"name":"width", "type":"INT"}]',<br/>
+                '[{"id": 0, "name":"area", "type":"BIGINT"}]',<br/>
+                true, 'comment', 'k1=v1,k2=v2')<br/>
+      </td>
+      <td>
+         To create a function. Arguments:
+            <li>function: the target function identifier. Cannot be empty.</li>
+            <li>inputParams: inputParams of the function.</li>
+            <li>returnParams: returnParams of the function.</li>
+            <li>deterministic: Whether the function is deterministic.</li>
+            <li>comment: The comment for the function.</li>
+            <li>options: the additional dynamic options of the function.</li>
+      </td>
+      <td>
+         CALL sys.create_function(`function` => 'function_identifier',<br/>
+              `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, 
{"id": 1, "name":"width", "type":"INT"}]',<br/>
+              `returnParams` => '[{"id": 0, "name":"area", 
"type":"BIGINT"}]',<br/>
+              `deterministic` => true,<br/>
+              `comment` => 'comment',<br/>
+              `options` => 'k1=v1,k2=v2'<br/>
+         )<br/>
+      </td>
+   </tr>
+   <tr>
+      <td>alter_function</td>
+      <td>
+         CALL sys.alter_function(<br/>
+                'function_identifier',<br/>
+                '{"action" : "addDefinition", "name" : "spark", "definition" : 
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return 
(long) length * width; }", "language": "JAVA" } }')<br/>
+      </td>
+      <td>
+         To alter a function. Arguments:
+            <li>function: the target function identifier. Cannot be empty.</li>
+            <li>change: change of the function.</li>
+      </td>
+      <td>
+         CALL sys.alter_function(`function` => 'function_identifier',<br/>
+              `change` => '{"action" : "addDefinition", "name" : "spark", 
"definition" : {"type" : "lambda", "definition" : "(Integer length, Integer 
width) -> { return (long) length * width; }", "language": "JAVA" } }'<br/>
+         )<br/>
+      </td>
+   </tr>
+   <tr>
+      <td>drop_function</td>
+      <td>
+         CALL [catalog.]sys.drop_function('function_identifier')<br/>
+      </td>
+      <td>
+         To drop a function. Arguments:
+            <li>function: the target function identifier. Cannot be empty.</li>
+      </td>
+      <td>
+         CALL sys.drop_function(`function` => 'function_identifier')<br/>
+      </td>
    </tr>
    </tbody>
 </table>
diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index 00878178c8..384bb6fe7a 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -1997,12 +1997,10 @@ components:
         type:
           type: string
           const: "file"
-        fileType:
-          type: string
-        storagePaths:
+        fileResources:
           type: array
           items:
-            type: string
+            $ref: '#/components/schemas/FunctionFileResource'
         language:
           type: string
         className:
@@ -2020,6 +2018,13 @@ components:
           type: string
         language:
           type: string
+    FunctionFileResource:
+      type: object
+      properties:
+        resourceType:
+          type: string
+        uri:
+          type: string
     ViewChange:
       anyOf:
         - $ref: '#/components/schemas/SetViewOption'
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java 
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
index 5683797ee2..b0550ab939 100644
--- 
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
+++ 
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionDefinition.java
@@ -46,13 +46,12 @@ import java.util.Objects;
 public interface FunctionDefinition {
 
     static FunctionDefinition file(
-            String fileType,
-            List<String> storagePaths,
+            List<FunctionFileResource> fileResources,
             String language,
             String className,
             String functionName) {
         return new FunctionDefinition.FileFunctionDefinition(
-                fileType, storagePaths, language, className, functionName);
+                fileResources, language, className, functionName);
     }
 
     static FunctionDefinition sql(String definition) {
@@ -67,17 +66,13 @@ public interface FunctionDefinition {
     @JsonIgnoreProperties(ignoreUnknown = true)
     final class FileFunctionDefinition implements FunctionDefinition {
 
-        private static final String FIELD_FILE_TYPE = "fileType";
-        private static final String FIELD_STORAGE_PATHS = "storagePaths";
+        private static final String FIELD_FILE_RESOURCES = "fileResources";
         private static final String FIELD_LANGUAGE = "language";
         private static final String FIELD_CLASS_NAME = "className";
         private static final String FIELD_FUNCTION_NAME = "functionName";
 
-        @JsonProperty(FIELD_FILE_TYPE)
-        private final String fileType;
-
-        @JsonProperty(FIELD_STORAGE_PATHS)
-        private final List<String> storagePaths;
+        @JsonProperty(FIELD_FILE_RESOURCES)
+        private final List<FunctionFileResource> fileResources;
 
         @JsonProperty(FIELD_LANGUAGE)
         private String language;
@@ -89,26 +84,19 @@ public interface FunctionDefinition {
         private String functionName;
 
         public FileFunctionDefinition(
-                @JsonProperty(FIELD_FILE_TYPE) String fileType,
-                @JsonProperty(FIELD_STORAGE_PATHS) List<String> storagePaths,
+                @JsonProperty(FIELD_FILE_RESOURCES) List<FunctionFileResource> 
fileResources,
                 @JsonProperty(FIELD_LANGUAGE) String language,
                 @JsonProperty(FIELD_CLASS_NAME) String className,
                 @JsonProperty(FIELD_FUNCTION_NAME) String functionName) {
-            this.fileType = fileType;
-            this.storagePaths = storagePaths;
+            this.fileResources = fileResources;
             this.language = language;
             this.className = className;
             this.functionName = functionName;
         }
 
-        @JsonGetter(FIELD_FILE_TYPE)
-        public String fileType() {
-            return fileType;
-        }
-
-        @JsonGetter(FIELD_STORAGE_PATHS)
-        public List<String> storagePaths() {
-            return storagePaths;
+        @JsonGetter(FIELD_FILE_RESOURCES)
+        public List<FunctionFileResource> fileResources() {
+            return fileResources;
         }
 
         @JsonGetter(FIELD_LANGUAGE)
@@ -135,8 +123,7 @@ public interface FunctionDefinition {
                 return false;
             }
             FileFunctionDefinition that = (FileFunctionDefinition) o;
-            return fileType.equals(that.fileType)
-                    && Objects.equals(storagePaths, that.storagePaths)
+            return Objects.equals(fileResources, that.fileResources)
                     && Objects.equals(language, that.language)
                     && Objects.equals(className, that.className)
                     && Objects.equals(functionName, that.functionName);
@@ -144,8 +131,8 @@ public interface FunctionDefinition {
 
         @Override
         public int hashCode() {
-            int result = Objects.hash(fileType, language, className, 
functionName);
-            result = 31 * result + Objects.hashCode(storagePaths);
+            int result = Objects.hash(language, className, functionName);
+            result = 31 * result + Objects.hashCode(fileResources);
             return result;
         }
     }
@@ -239,4 +226,49 @@ public interface FunctionDefinition {
 
         private Types() {}
     }
+
+    /** Function file resource. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    class FunctionFileResource {
+
+        private static final String FIELD_RESOURCE_TYPE = "resourceType";
+        private static final String FIELD_URI = "uri";
+
+        private final String resourceType;
+        private final String uri;
+
+        public FunctionFileResource(
+                @JsonProperty(FIELD_RESOURCE_TYPE) String resourceType,
+                @JsonProperty(FIELD_URI) String uri) {
+            this.resourceType = resourceType;
+            this.uri = uri;
+        }
+
+        @JsonGetter(FIELD_RESOURCE_TYPE)
+        public String resourceType() {
+            return resourceType;
+        }
+
+        @JsonGetter(FIELD_URI)
+        public String uri() {
+            return uri;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            FunctionFileResource that = (FunctionFileResource) o;
+            return resourceType.equals(that.resourceType) && 
uri.equals(that.uri);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(resourceType, uri);
+        }
+    }
 }
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java 
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
index b0290ea50e..f617aa3a97 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
+++ b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
@@ -59,6 +59,16 @@ public class FunctionImpl implements Function {
         this.options = options;
     }
 
+    public FunctionImpl(Identifier identifier, Map<String, FunctionDefinition> 
definitions) {
+        this.identifier = identifier;
+        this.inputParams = null;
+        this.returnParams = null;
+        this.deterministic = true;
+        this.definitions = definitions;
+        this.comment = null;
+        this.options = null;
+    }
+
     @Override
     public String name() {
         return identifier.getObjectName();
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
index 94814d6d85..a3fe6c4950 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/auth/DLFAuthProvider.java
@@ -20,7 +20,6 @@ package org.apache.paimon.rest.auth;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 
-import okhttp3.MediaType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +50,7 @@ public class DLFAuthProvider implements AuthProvider {
 
     public static final DateTimeFormatter AUTH_DATE_TIME_FORMATTER =
             DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'");
-    protected static final MediaType MEDIA_TYPE = 
MediaType.parse("application/json");
+    protected static final String MEDIA_TYPE = "application/json";
 
     @Nullable private final DLFTokenLoader tokenLoader;
     @Nullable protected DLFToken token;
@@ -143,7 +142,7 @@ public class DLFAuthProvider implements AuthProvider {
         signHeaders.put(DLF_CONTENT_SHA56_HEADER_KEY, DLF_CONTENT_SHA56_VALUE);
         signHeaders.put(DLF_AUTH_VERSION_HEADER_KEY, DLFAuthSignature.VERSION);
         if (data != null && !data.isEmpty()) {
-            signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE.toString());
+            signHeaders.put(DLF_CONTENT_TYPE_KEY, MEDIA_TYPE);
             signHeaders.put(DLF_CONTENT_MD5_HEADER_KEY, 
DLFAuthSignature.md5(data));
         }
         if (securityToken != null) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
index 8284836a3b..bbb90e2f98 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParameterUtils.java
@@ -18,6 +18,11 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeJsonParser;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -70,4 +75,18 @@ public class ParameterUtils {
         }
         mapList.put(kv[0].trim(), valueList);
     }
+
+    public static List<DataField> parseDataFieldArray(String data) {
+        List<DataField> list = new ArrayList<>();
+        if (data != null) {
+            JsonNode jsonArray = JsonSerdeUtil.fromJson(data, JsonNode.class);
+            if (jsonArray.isArray()) {
+                for (JsonNode objNode : jsonArray) {
+                    DataField dataField = 
DataTypeJsonParser.parseDataField(objNode);
+                    list.add(dataField);
+                }
+            }
+        }
+        return list;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index acb7a8ff56..fe15ba5370 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -324,7 +324,11 @@ public class MockRESTMessage {
                 Lists.newArrayList(new DataField(0, "area", 
DataTypes.DOUBLE()));
         FunctionDefinition flinkFunction =
                 FunctionDefinition.file(
-                        "jar", Lists.newArrayList("/a/b/c.jar"), "java", 
"className", "eval");
+                        Lists.newArrayList(
+                                new 
FunctionDefinition.FunctionFileResource("jar", "/a/b/c.jar")),
+                        "java",
+                        "className",
+                        "eval");
         FunctionDefinition sparkFunction =
                 FunctionDefinition.lambda(
                         "(Double length, Double width) -> length * width", 
"java");
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 07e338c10d..d183184850 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -903,7 +903,10 @@ public class RESTCatalogServer {
                         RESTApi.fromJson(data, AlterFunctionRequest.class);
                 HashMap<String, FunctionDefinition> newDefinitions =
                         new HashMap<>(function.definitions());
-                Map<String, String> newOptions = new 
HashMap<>(function.options());
+                Map<String, String> newOptions =
+                        function.options() != null
+                                ? new HashMap<>(function.options())
+                                : new HashMap<>();
                 String newComment = function.comment();
                 for (FunctionChange functionChange : requestBody.changes()) {
                     if (functionChange instanceof 
FunctionChange.SetFunctionOption) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
index 35780998c5..cbabd69037 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/AuthProviderTest.java
@@ -366,9 +366,7 @@ public class AuthProviderTest {
         assertTrue(header.containsKey(DLF_DATE_HEADER_KEY));
         assertEquals(
                 DLFAuthSignature.VERSION, 
header.get(DLFAuthProvider.DLF_AUTH_VERSION_HEADER_KEY));
-        assertEquals(
-                DLFAuthProvider.MEDIA_TYPE.toString(),
-                header.get(DLFAuthProvider.DLF_CONTENT_TYPE_KEY));
+        assertEquals(DLFAuthProvider.MEDIA_TYPE, 
header.get(DLFAuthProvider.DLF_CONTENT_TYPE_KEY));
         assertEquals(
                 DLFAuthSignature.md5(data), 
header.get(DLFAuthProvider.DLF_CONTENT_MD5_HEADER_KEY));
         assertEquals(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ed4f3b8242..10f437e8bd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -29,6 +29,9 @@ import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
 import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.function.FunctionDefinition;
+import org.apache.paimon.function.FunctionImpl;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
@@ -49,11 +52,14 @@ import org.apache.paimon.utils.StringUtils;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionImpl;
@@ -61,6 +67,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
@@ -89,6 +96,7 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
 import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
@@ -101,6 +109,8 @@ import 
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.procedures.Procedure;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,6 +178,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 public class FlinkCatalog extends AbstractCatalog {
 
     public static final String DIALECT = "flink";
+    public static final String FUNCTION_DEFINITION_NAME = "flink";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCatalog.class);
 
@@ -1460,40 +1471,124 @@ public class FlinkCatalog extends AbstractCatalog {
 
     @Override
     public final List<String> listFunctions(String dbName) throws 
CatalogException {
-        return Collections.emptyList();
+        try {
+            return catalog.listFunctions(dbName);
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new CatalogException(e.getMessage(), e);
+        }
     }
 
     @Override
     public final CatalogFunction getFunction(ObjectPath functionPath)
             throws FunctionNotExistException, CatalogException {
-        throw new FunctionNotExistException(getName(), functionPath);
+        try {
+            org.apache.paimon.function.Function function =
+                    catalog.getFunction(toIdentifier(functionPath));
+            FunctionDefinition functionDefinition = 
function.definition(FUNCTION_DEFINITION_NAME);
+            // as current only support file function, so check type
+            if (functionDefinition instanceof 
FunctionDefinition.FileFunctionDefinition) {
+                FunctionDefinition.FileFunctionDefinition 
fileFunctionDefinition =
+                        (FunctionDefinition.FileFunctionDefinition) 
functionDefinition;
+                List<ResourceUri> resourceUris =
+                        fileFunctionDefinition.fileResources().stream()
+                                .map(
+                                        resource ->
+                                                new ResourceUri(
+                                                        ResourceType.valueOf(
+                                                                
resource.resourceType()),
+                                                        resource.uri()))
+                                .collect(Collectors.toList());
+                return new CatalogFunctionImpl(
+                        fileFunctionDefinition.className(),
+                        
FunctionLanguage.valueOf(fileFunctionDefinition.language()),
+                        resourceUris);
+            }
+            throw new FunctionNotExistException(getName(), functionPath);
+        } catch (Catalog.FunctionNotExistException e) {
+            throw new FunctionNotExistException(getName(), functionPath);
+        }
     }
 
     @Override
     public final boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
-        return false;
+        try {
+            return catalog.listFunctions(functionPath.getDatabaseName())
+                    .contains(functionPath.getObjectName());
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new CatalogException(e.getMessage(), e);
+        }
     }
 
     @Override
     public final void createFunction(
             ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException(
-                "Create function is not supported,"
-                        + " maybe you can use 'CREATE TEMPORARY FUNCTION' 
instead.");
+            throws FunctionAlreadyExistException, CatalogException {
+        List<FunctionDefinition.FunctionFileResource> fileResources =
+                function.getFunctionResources().stream()
+                        .map(
+                                r ->
+                                        new 
FunctionDefinition.FunctionFileResource(
+                                                r.getResourceType().name(), 
r.getUri()))
+                        .collect(Collectors.toList());
+        FunctionDefinition functionDefinition =
+                FunctionDefinition.file(
+                        fileResources,
+                        function.getFunctionLanguage().name(),
+                        function.getClassName(),
+                        functionPath.getObjectName());
+        Map<String, FunctionDefinition> definitions = new HashMap<>();
+        definitions.put(FUNCTION_DEFINITION_NAME, functionDefinition);
+        org.apache.paimon.function.Function paimonFunction =
+                new FunctionImpl(toIdentifier(functionPath), definitions);
+        try {
+            catalog.createFunction(toIdentifier(functionPath), paimonFunction, 
ignoreIfExists);
+        } catch (Catalog.FunctionAlreadyExistException | 
Catalog.DatabaseNotExistException e) {
+            throw new FunctionAlreadyExistException(getName(), functionPath);
+        }
     }
 
     @Override
     public final void alterFunction(
             ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
+            throws FunctionNotExistException, CatalogException {
+        try {
+            org.apache.paimon.function.Function function =
+                    catalog.getFunction(toIdentifier(functionPath));
+            FunctionDefinition.FileFunctionDefinition functionDefinition =
+                    (FunctionDefinition.FileFunctionDefinition)
+                            function.definition(FUNCTION_DEFINITION_NAME);
+            if (functionDefinition != null) {
+                FunctionDefinition newFunctionDefinition =
+                        FunctionDefinition.file(
+                                functionDefinition.fileResources(),
+                                newFunction.getFunctionLanguage().name(),
+                                newFunction.getClassName(),
+                                functionPath.getObjectName());
+                FunctionChange functionChange =
+                        FunctionChange.updateDefinition(
+                                FUNCTION_DEFINITION_NAME, 
newFunctionDefinition);
+                catalog.alterFunction(
+                        toIdentifier(functionPath),
+                        ImmutableList.of(functionChange),
+                        ignoreIfNotExists);
+            }
+        } catch (Catalog.FunctionNotExistException e) {
+            throw new FunctionNotExistException(getName(), functionPath);
+        } catch (Catalog.DefinitionAlreadyExistException e) {
+            throw new RuntimeException(e);
+        } catch (Catalog.DefinitionNotExistException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public final void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
+            throws FunctionNotExistException, CatalogException {
+        try {
+            catalog.dropFunction(toIdentifier(functionPath), 
ignoreIfNotExists);
+        } catch (Catalog.FunctionNotExistException e) {
+            throw new FunctionNotExistException(getName(), functionPath);
+        }
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java
new file mode 100644
index 0000000000..bff94aa63e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterFunctionProcedure.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * alter function procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  CALL sys.alter_function('function_identifier', '{"action" : 
"addDefinition", "name" : "flink", "definition" : {"type" : "file", 
"fileResources" : [{"resourceType": "JAR", "uri": "oss://mybucket/xxxx.jar"}], 
"language": "JAVA", "className": "xxxx", "functionName": "functionName" } }')
+ *
+ * </code></pre>
+ */
+public class AlterFunctionProcedure extends ProcedureBase {
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "function", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(name = "change", type = @DataTypeHint("STRING")),
+            })
+    public String[] call(ProcedureContext procedureContext, String function, 
String change)
+            throws Catalog.FunctionNotExistException, 
Catalog.DefinitionAlreadyExistException,
+                    Catalog.DefinitionNotExistException {
+        Identifier identifier = Identifier.fromString(function);
+        FunctionChange functionChange = JsonSerdeUtil.fromJson(change, 
FunctionChange.class);
+        catalog.alterFunction(identifier, ImmutableList.of(functionChange), 
false);
+        return new String[] {"Success"};
+    }
+
+    @Override
+    public String identifier() {
+        return "alter_function";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java
new file mode 100644
index 0000000000..77f2d631fa
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateFunctionProcedure.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionImpl;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Optional;
+
+/**
+ * create function procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  CALL sys.create_function('function_identifier',
+ *     '[{"id": 0, "name":"length", "type":"INT"}', '{"id": 1, "name":"width", 
"type":"INT"}]',
+ *     '[{"id": 0, "name":"area", "type":"BIGINT"]',
+ *     true, 'comment'
+ *    )
+ *
+ * </code></pre>
+ */
+public class CreateFunctionProcedure extends ProcedureBase {
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "function", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(name = "inputParams", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(name = "returnParams", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "deterministic",
+                        type = @DataTypeHint("BOOLEAN"),
+                        isOptional = true),
+                @ArgumentHint(name = "comment", type = 
@DataTypeHint("STRING"), isOptional = true),
+                @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String function,
+            String inputParams,
+            String returnParams,
+            Boolean deterministic,
+            String comment,
+            String options)
+            throws Catalog.FunctionAlreadyExistException, 
Catalog.DatabaseNotExistException {
+        Identifier identifier = Identifier.fromString(function);
+        FunctionImpl functionImpl =
+                new FunctionImpl(
+                        identifier,
+                        ParameterUtils.parseDataFieldArray(inputParams),
+                        ParameterUtils.parseDataFieldArray(returnParams),
+                        Optional.ofNullable(deterministic).orElse(true),
+                        Maps.newHashMap(),
+                        comment,
+                        ParameterUtils.parseCommaSeparatedKeyValues(options));
+        catalog.createFunction(identifier, functionImpl, false);
+        return new String[] {"Success"};
+    }
+
+    @Override
+    public String identifier() {
+        return "create_function";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java
new file mode 100644
index 0000000000..6cf957fa96
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropFunctionProcedure.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * drop function procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  CALL sys.drop_function('function_identifier')
+ *
+ * </code></pre>
+ */
+public class DropFunctionProcedure extends ProcedureBase {
+    @ProcedureHint(argument = {@ArgumentHint(name = "function", type = 
@DataTypeHint("STRING"))})
+    public String[] call(ProcedureContext procedureContext, String function)
+            throws Catalog.FunctionNotExistException {
+        Identifier identifier = Identifier.fromString(function);
+        catalog.dropFunction(identifier, false);
+        return new String[] {"Success"};
+    }
+
+    @Override
+    public String identifier() {
+        return "drop_function";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index b344ea56af..df615478c5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -92,3 +92,6 @@ org.apache.paimon.flink.procedure.ClearConsumersProcedure
 org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
 org.apache.paimon.flink.procedure.RescaleProcedure
 org.apache.paimon.flink.procedure.AlterViewDialectProcedure
+org.apache.paimon.flink.procedure.CreateFunctionProcedure
+org.apache.paimon.flink.procedure.DropFunctionProcedure
+org.apache.paimon.flink.procedure.AlterFunctionProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 3ec704a10c..7742bbb3f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -21,8 +21,16 @@ package org.apache.paimon.flink;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.rest.RESTToken;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
@@ -102,4 +110,44 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
         assertThat(batchSql(String.format("SELECT * FROM %s.%s", 
DATABASE_NAME, TABLE_NAME)))
                 .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 
22.0D));
     }
+
+    @Test
+    public void testFunction() throws Exception {
+        Catalog catalog = tEnv.getCatalog("PAIMON").get();
+        String functionName = "test_str2";
+        String identifier = "com.streaming.flink.udf.StrUdf";
+        String jarResourcePath = "xxxx.jar";
+        String jarResourcePath2 = "xxxx-yyyyy.jar";
+        CatalogFunctionImpl function =
+                new CatalogFunctionImpl(
+                        identifier,
+                        FunctionLanguage.JAVA,
+                        ImmutableList.of(
+                                new ResourceUri(ResourceType.JAR, 
jarResourcePath),
+                                new ResourceUri(ResourceType.JAR, 
jarResourcePath2)));
+        sql(
+                String.format(
+                        "CREATE FUNCTION %s.%s AS '%s' LANGUAGE %s USING %s 
'%s', %s '%s'",
+                        DATABASE_NAME,
+                        functionName,
+                        function.getClassName(),
+                        function.getFunctionLanguage(),
+                        ResourceType.JAR,
+                        jarResourcePath,
+                        ResourceType.JAR,
+                        jarResourcePath2));
+        assertThat(batchSql(String.format("SHOW 
FUNCTIONS"))).contains(Row.of(functionName));
+        ObjectPath functionObjectPath = new ObjectPath(DATABASE_NAME, 
functionName);
+        CatalogFunction getFunction = catalog.getFunction(functionObjectPath);
+        assertThat(getFunction).isEqualTo(function);
+        identifier = "com.streaming.flink.udf.StrUdf2";
+        sql(
+                String.format(
+                        "ALTER FUNCTION PAIMON.%s.%s AS '%s' LANGUAGE %s",
+                        DATABASE_NAME, functionName, identifier, 
function.getFunctionLanguage()));
+        getFunction = catalog.getFunction(functionObjectPath);
+        assertThat(getFunction.getClassName()).isEqualTo(identifier);
+        sql(String.format("DROP FUNCTION %s.%s", DATABASE_NAME, functionName));
+        assertThat(catalog.functionExists(functionObjectPath)).isFalse();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FunctionProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FunctionProcedureITCase.java
new file mode 100644
index 0000000000..33ba3e9a3f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/FunctionProcedureITCase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.RESTCatalogITCaseBase;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for function procedure. */
+public class FunctionProcedureITCase extends RESTCatalogITCaseBase {
+
+    @Test
+    public void test() throws FunctionNotExistException {
+        String functionName = "test_function";
+        List<Row> result =
+                sql(
+                        String.format(
+                                "CALL sys.create_function('%s.%s', "
+                                        + "'[{\"id\": 0, \"name\":\"length\", 
\"type\":\"INT\"}, {\"id\": 1, \"name\":\"width\", \"type\":\"INT\"}]',"
+                                        + "'[{\"id\": 0, \"name\":\"area\", 
\"type\":\"BIGINT\"}]', true, 'comment', 'k1=v1,k2=v2')",
+                                DATABASE_NAME, functionName));
+        assertThat(result.toString()).contains("Success");
+        assertThat(batchSql(String.format("SHOW 
FUNCTIONS"))).contains(Row.of(functionName));
+        result =
+                sql(
+                        String.format(
+                                "CALL sys.alter_function('%s.%s', '{\"action\" 
: \"addDefinition\", \"name\" : \"flink\", \"definition\" : {\"type\" : 
\"file\", \"fileResources\" : [{\"resourceType\": \"JAR\", \"uri\": 
\"oss://mybucket/xxxx.jar\"}], \"language\": \"JAVA\", \"className\": \"xxxx\", 
\"functionName\": \"functionName\" } }')",
+                                DATABASE_NAME, functionName));
+        assertThat(result.toString()).contains("Success");
+        Catalog catalog = tEnv.getCatalog("PAIMON").get();
+        ObjectPath functionObjectPath = new ObjectPath(DATABASE_NAME, 
functionName);
+        assertThat(batchSql(String.format("SHOW 
FUNCTIONS"))).contains(Row.of(functionName));
+        CatalogFunction getFunction = catalog.getFunction(functionObjectPath);
+        assertThat(getFunction.getClassName()).isEqualTo("xxxx");
+        result = sql(String.format("CALL sys.drop_function('%s.%s')", 
DATABASE_NAME, functionName));
+        assertThat(result.toString()).contains("Success");
+        assertThat(batchSql(String.format("SHOW 
FUNCTIONS"))).doesNotContain(Row.of(functionName));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/JavaLambdaStringToMethodConverter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/JavaLambdaStringToMethodConverter.java
new file mode 100644
index 0000000000..baf443cd5b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/JavaLambdaStringToMethodConverter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import javax.tools.JavaCompiler;
+import javax.tools.JavaFileObject;
+import javax.tools.StandardJavaFileManager;
+import javax.tools.ToolProvider;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Util class for compiling and loading Java lambda expressions as methods. */
+public class JavaLambdaStringToMethodConverter {
+    private static final Pattern LAMBDA_PATTERN =
+            Pattern.compile("\\s*\\(([^)]*)\\)\\s*->\\s*(.+)\\s*");
+
+    public static String getClassName(String functionName) {
+        return "PaimonLambdaFunction" + functionName;
+    }
+
+    public static String getSourceFileName(String functionName) {
+        return String.format("%s.java", getClassName(functionName));
+    }
+
+    public static Method compileAndLoadMethod(
+            String functionName, String lambdaExpression, String returnType) 
throws Exception {
+        String className = getClassName(functionName);
+        String fullMethod = parseLambdaWithType(returnType, lambdaExpression, 
"apply");
+
+        String javaCode = "public class " + className + " { " + fullMethod + " 
}";
+
+        File sourceFile = new File(getSourceFileName(functionName));
+        try (FileWriter writer = new FileWriter(sourceFile)) {
+            writer.write(javaCode);
+        }
+
+        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+        StandardJavaFileManager fileManager = 
compiler.getStandardFileManager(null, null, null);
+        Iterable<? extends JavaFileObject> compilationUnits =
+                
fileManager.getJavaFileObjectsFromFiles(Arrays.asList(sourceFile));
+        compiler.getTask(null, fileManager, null, null, null, 
compilationUnits).call();
+        fileManager.close();
+        URLClassLoader classLoader =
+                URLClassLoader.newInstance(new URL[] {new 
File(".").toURI().toURL()});
+        Class<?> compiledClass = Class.forName(className, true, classLoader);
+        return compiledClass.getDeclaredMethods()[0];
+    }
+
+    public static String parseLambdaWithType(
+            String outputType, String expression, String methodName) {
+        Matcher matcher = LAMBDA_PATTERN.matcher(expression);
+        if (!matcher.matches()) {
+            throw new IllegalArgumentException("Invalid lambda expression: " + 
expression);
+        }
+
+        String parameters = matcher.group(1).trim();
+        String body = matcher.group(2).trim();
+
+        StringBuilder method = new StringBuilder();
+        method.append("public static ")
+                .append(outputType)
+                .append(" ")
+                .append(methodName)
+                .append("(")
+                .append(parameters)
+                .append(")");
+
+        if (!body.startsWith("{")) {
+            method.append("{");
+        }
+        if (!body.contains("return ")) {
+            method.append("return ");
+        }
+        if (!body.contains(";")) {
+            method.append(body).append(";");
+        } else {
+            method.append(body);
+        }
+        if (!body.endsWith("}")) {
+            method.append("}");
+        }
+        return method.toString();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/LambdaScalarFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/LambdaScalarFunction.java
new file mode 100644
index 0000000000..5621df0cff
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/LambdaScalarFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** paimon spark java lambda scalar function. */
+public class LambdaScalarFunction implements UnboundFunction, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private DataType outputType;
+    private String javaType;
+    private String functionName;
+    private String lambdaExpression;
+
+    public LambdaScalarFunction(
+            String functionName, DataType outputType, String javaType, String 
lambdaExpression) {
+        try {
+            this.outputType = outputType;
+            this.javaType = javaType;
+            this.functionName = functionName;
+            this.lambdaExpression = lambdaExpression;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to compile lambda expression", 
e);
+        }
+    }
+
+    @Override
+    public BoundFunction bind(StructType inputType) {
+        try {
+            List<DataType> inputTypes = new ArrayList<>();
+            for (StructField field : inputType.fields()) {
+                inputTypes.add(field.dataType());
+            }
+            return new PaimonSparkScalarFunction(
+                    functionName, inputTypes, outputType, javaType, 
this.lambdaExpression);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String description() {
+        return "";
+    }
+
+    @Override
+    public String name() {
+        return functionName;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/PaimonSparkScalarFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/PaimonSparkScalarFunction.java
new file mode 100644
index 0000000000..1a90b72569
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/PaimonSparkScalarFunction.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.spark.utils.CatalogUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
+import org.apache.spark.sql.types.DataType;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Scalar function for Spark. */
+public class PaimonSparkScalarFunction implements ScalarFunction<Object>, 
Serializable {
+    private static final long serialVersionUID = 1L;
+    private final String functionName;
+    private final List<DataType> inputTypes;
+    private final DataType resultType;
+    private String javaType;
+    private String lambdaExpression;
+    private transient Method compiledMethod;
+
+    public PaimonSparkScalarFunction(
+            String functionName,
+            List<DataType> inputTypes,
+            DataType resultType,
+            String javaType,
+            String lambdaExpression) {
+        this.functionName = functionName;
+        this.inputTypes = inputTypes;
+        this.resultType = resultType;
+        this.javaType = javaType;
+        this.lambdaExpression = lambdaExpression;
+    }
+
+    @Override
+    public DataType[] inputTypes() {
+        return inputTypes.toArray(new DataType[inputTypes.size()]);
+    }
+
+    @Override
+    public DataType resultType() {
+        return resultType;
+    }
+
+    @Override
+    public Object produceResult(InternalRow input) {
+        try {
+            if (this.compiledMethod == null) {
+                this.compiledMethod =
+                        JavaLambdaStringToMethodConverter.compileAndLoadMethod(
+                                functionName, lambdaExpression, javaType);
+            }
+            List<Object> parameters = new ArrayList<>();
+            for (int i = 0; i < inputTypes().length; i++) {
+                Object obj =
+                        CatalogUtils.convertSparkJavaToPaimonJava(
+                                inputTypes()[i], input.get(i, 
inputTypes()[i]));
+                parameters.add(obj);
+            }
+            return this.compiledMethod.invoke(
+                    null, parameters.toArray(new Object[parameters.size()]));
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return functionName;
+    }
+
+    @Override
+    public String canonicalName() {
+        return functionName;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c323d86dfd..40d07d906a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -23,25 +23,33 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.function.Function;
+import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.spark.catalog.SparkBaseCatalog;
-import org.apache.paimon.spark.catalog.SupportFunction;
 import org.apache.paimon.spark.catalog.SupportView;
+import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
+import org.apache.paimon.spark.utils.CatalogUtils;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.FormatTableOptions;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.FieldReference;
 import org.apache.spark.sql.connector.expressions.IdentityTransform;
 import org.apache.spark.sql.connector.expressions.NamedReference;
@@ -81,10 +89,12 @@ import static 
org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
 import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
 
 /** Spark {@link TableCatalog} for paimon. */
-public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, 
SupportView {
+public class SparkCatalog extends SparkBaseCatalog
+        implements SupportView, FunctionCatalog, SupportsNamespaces {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
+    public static final String FUNCTION_DEFINITION_NAME = "spark";
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
     private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
 
     protected Catalog catalog = null;
@@ -550,6 +560,75 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction, S
         }
     }
 
+    @Override
+    public Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
+        if (isFunctionNamespace(namespace)) {
+            List<Identifier> functionIdentifiers = new ArrayList<>();
+            PaimonFunctions.names()
+                    .forEach(name -> 
functionIdentifiers.add(Identifier.of(namespace, name)));
+            if (namespace.length > 0) {
+                String databaseName = getDatabaseNameFromNamespace(namespace);
+                try {
+                    catalog.listFunctions(databaseName)
+                            .forEach(
+                                    name ->
+                                            functionIdentifiers.add(
+                                                    Identifier.of(namespace, 
name)));
+                } catch (Catalog.DatabaseNotExistException e) {
+                    throw new NoSuchNamespaceException(namespace);
+                }
+            }
+            return functionIdentifiers.toArray(new Identifier[0]);
+        }
+        throw new NoSuchNamespaceException(namespace);
+    }
+
+    @Override
+    public UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
+        if (isFunctionNamespace(ident.namespace())) {
+            UnboundFunction func = PaimonFunctions.load(ident.name());
+            if (func != null) {
+                return func;
+            }
+            try {
+                Function paimonFunction = 
catalog.getFunction(toIdentifier(ident));
+                FunctionDefinition functionDefinition =
+                        paimonFunction.definition(FUNCTION_DEFINITION_NAME);
+                if (functionDefinition != null
+                        && functionDefinition
+                                instanceof 
FunctionDefinition.LambdaFunctionDefinition) {
+                    FunctionDefinition.LambdaFunctionDefinition 
lambdaFunctionDefinition =
+                            (FunctionDefinition.LambdaFunctionDefinition) 
functionDefinition;
+                    if (paimonFunction.returnParams().isPresent()) {
+                        List<DataField> dataFields = 
paimonFunction.returnParams().get();
+                        if (dataFields.size() == 1) {
+                            DataField dataField = dataFields.get(0);
+                            return new LambdaScalarFunction(
+                                    ident.name(),
+                                    
CatalogUtils.paimonType2SparkType(dataField.type()),
+                                    
CatalogUtils.paimonType2JavaType(dataField.type()),
+                                    lambdaFunctionDefinition.definition());
+                        } else {
+                            throw new UnsupportedOperationException(
+                                    "outParams size > 1 is not supported");
+                        }
+                    }
+                }
+            } catch (Catalog.FunctionNotExistException e) {
+                throw new NoSuchFunctionException(ident);
+            }
+        }
+
+        throw new NoSuchFunctionException(ident);
+    }
+
+    private boolean isFunctionNamespace(String[] namespace) {
+        // Allow for empty namespace, as Spark's bucket join will use `bucket` 
function with empty
+        // namespace to generate transforms for partitioning.
+        // Otherwise, check if it is paimon namespace.
+        return namespace.length == 0 || (namespace.length == 1 && 
namespaceExists(namespace));
+    }
+
     private PropertyChange toPropertyChange(NamespaceChange change) {
         if (change instanceof NamespaceChange.SetProperty) {
             NamespaceChange.SetProperty set = (NamespaceChange.SetProperty) 
change;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 53ae3758d5..4e2e30a0c9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -18,15 +18,18 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
 import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
 import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
 import org.apache.paimon.spark.procedure.CompactManifestProcedure;
 import org.apache.paimon.spark.procedure.CompactProcedure;
 import org.apache.paimon.spark.procedure.CreateBranchProcedure;
+import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
 import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
 import org.apache.paimon.spark.procedure.CreateTagProcedure;
 import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
 import org.apache.paimon.spark.procedure.DeleteTagProcedure;
+import org.apache.paimon.spark.procedure.DropFunctionProcedure;
 import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
 import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
 import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
@@ -102,6 +105,9 @@ public class SparkProcedures {
         procedureBuilders.put("refresh_object_table", 
RefreshObjectTableProcedure::builder);
         procedureBuilders.put("clear_consumers", 
ClearConsumersProcedure::builder);
         procedureBuilders.put("alter_view_dialect", 
AlterViewDialectProcedure::builder);
+        procedureBuilders.put("create_function", 
CreateFunctionProcedure::builder);
+        procedureBuilders.put("alter_function", 
AlterFunctionProcedure::builder);
+        procedureBuilders.put("drop_function", DropFunctionProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
deleted file mode 100644
index d3fc3528bb..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.catalog;
-
-import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
-
-import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
-import org.apache.spark.sql.connector.catalog.FunctionCatalog;
-import org.apache.spark.sql.connector.catalog.Identifier;
-import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
-
-/** Catalog methods for working with Functions. */
-public interface SupportFunction extends FunctionCatalog, SupportsNamespaces {
-
-    default boolean isFunctionNamespace(String[] namespace) {
-        // Allow for empty namespace, as Spark's bucket join will use `bucket` 
function with empty
-        // namespace to generate transforms for partitioning.
-        // Otherwise, check if it is paimon namespace.
-        return namespace.length == 0 || (namespace.length == 1 && 
namespaceExists(namespace));
-    }
-
-    @Override
-    default Identifier[] listFunctions(String[] namespace) throws 
NoSuchNamespaceException {
-        if (isFunctionNamespace(namespace)) {
-            return PaimonFunctions.names().stream()
-                    .map(name -> Identifier.of(namespace, name))
-                    .toArray(Identifier[]::new);
-        }
-
-        throw new NoSuchNamespaceException(namespace);
-    }
-
-    @Override
-    default UnboundFunction loadFunction(Identifier ident) throws 
NoSuchFunctionException {
-        if (isFunctionNamespace(ident.namespace())) {
-            UnboundFunction func = PaimonFunctions.load(ident.name());
-            if (func != null) {
-                return func;
-            }
-        }
-
-        throw new NoSuchFunctionException(ident);
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java
new file mode 100644
index 0000000000..daf2a1676f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/AlterFunctionProcedure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * alter function procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  CALL sys.alter_function('function_identifier', change)
+ *
+ * </code></pre>
+ */
+public class AlterFunctionProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("function", StringType),
+                ProcedureParameter.required("change", StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected AlterFunctionProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
+        org.apache.spark.sql.connector.catalog.Identifier ident =
+                toIdentifier(args.getString(0), PARAMETERS[0].name());
+        Identifier function = CatalogUtils.toIdentifier(ident);
+        FunctionChange functionChange =
+                JsonSerdeUtil.fromJson(args.getString(1), 
FunctionChange.class);
+        try {
+            paimonCatalog.alterFunction(function, 
ImmutableList.of(functionChange), false);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return new InternalRow[] {newInternalRow(true)};
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<AlterFunctionProcedure>() {
+            @Override
+            public AlterFunctionProcedure doBuild() {
+                return new AlterFunctionProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "AlterFunctionProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java
new file mode 100644
index 0000000000..a2c47739ee
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateFunctionProcedure.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.FunctionImpl;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * create function procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  CALL sys.create_function('function_identifier',
+ *     '[{"id": 0, "name":"length", "type":"INT"}', '{"id": 1, "name":"width", 
"type":"INT"}]',
+ *     '[{"id": 0, "name":"area", "type":"BIGINT"]',
+ *     true, 'comment', 'k1=v1,k2=v2'
+ *    )
+ *
+ * </code></pre>
+ */
+public class CreateFunctionProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("function", StringType),
+                ProcedureParameter.required("inputParams", StringType),
+                ProcedureParameter.required("returnParams", StringType),
+                ProcedureParameter.optional("deterministic", BooleanType),
+                ProcedureParameter.optional("comment", StringType),
+                ProcedureParameter.optional("options", StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected CreateFunctionProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
+        org.apache.spark.sql.connector.catalog.Identifier ident =
+                toIdentifier(args.getString(0), PARAMETERS[0].name());
+        Identifier function = CatalogUtils.toIdentifier(ident);
+        List<DataField> inputParams = getDataFieldsFromArguments(1, args);
+        List<DataField> returnParams = getDataFieldsFromArguments(2, args);
+        boolean deterministic = args.isNullAt(3) ? true : args.getBoolean(3);
+        String comment = args.isNullAt(4) ? null : args.getString(4);
+        String properties = args.isNullAt(5) ? null : args.getString(5);
+        Map<String, String> options = 
ParameterUtils.parseCommaSeparatedKeyValues(properties);
+        try {
+            FunctionImpl functionImpl =
+                    new FunctionImpl(
+                            function,
+                            inputParams,
+                            returnParams,
+                            deterministic,
+                            Maps.newHashMap(),
+                            comment,
+                            options);
+            paimonCatalog.createFunction(function, functionImpl, false);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return new InternalRow[] {newInternalRow(true)};
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<CreateFunctionProcedure>() {
+            @Override
+            public CreateFunctionProcedure doBuild() {
+                return new CreateFunctionProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "CreateFunctionProcedure";
+    }
+
+    public static List<DataField> getDataFieldsFromArguments(int position, 
InternalRow args) {
+        String data = args.isNullAt(position) ? null : 
args.getString(position);
+        return ParameterUtils.parseDataFieldArray(data);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java
new file mode 100644
index 0000000000..2a63f6eff6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropFunctionProcedure.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.utils.CatalogUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * drop function procedure. Usage:
+ *
+ * <pre><code>
+ *  -- NOTE: use '' as placeholder for optional arguments
+ *
+ *  CALL sys.drop_function('function_identifier')
+ *
+ * </code></pre>
+ */
+public class DropFunctionProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {ProcedureParameter.required("function", 
StringType)};
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected DropFunctionProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
+        org.apache.spark.sql.connector.catalog.Identifier ident =
+                toIdentifier(args.getString(0), PARAMETERS[0].name());
+        Identifier function = CatalogUtils.toIdentifier(ident);
+        try {
+            paimonCatalog.dropFunction(function, false);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return new InternalRow[] {newInternalRow(true)};
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<DropFunctionProcedure>() {
+            @Override
+            public DropFunctionProcedure doBuild() {
+                return new DropFunctionProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "DropFunctionProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
index da2bcc10a2..ddda188d18 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
@@ -18,11 +18,41 @@
 
 package org.apache.paimon.spark.utils;
 
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.ShortType;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.ByteType;
+import static org.apache.spark.sql.types.DataTypes.DoubleType;
+import static org.apache.spark.sql.types.DataTypes.FloatType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.apache.spark.sql.types.DataTypes.ShortType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+import static org.apache.spark.sql.types.DataTypes.TimestampType;
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
 
 /** Utils of catalog. */
 public class CatalogUtils {
@@ -52,4 +82,170 @@ public class CatalogUtils {
             return ident;
         }
     }
+
+    public static org.apache.spark.sql.types.DataType paimonType2SparkType(
+            org.apache.paimon.types.DataType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return StringType;
+            case BOOLEAN:
+                return BooleanType;
+            case BINARY:
+            case VARBINARY:
+                return BinaryType;
+            case TINYINT:
+                return ByteType;
+            case SMALLINT:
+                return ShortType;
+            case INTEGER:
+                return IntegerType;
+            case BIGINT:
+                return LongType;
+            case FLOAT:
+                return FloatType;
+            case DOUBLE:
+                return DoubleType;
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) type;
+                return DataTypes.createDecimalType(
+                        decimalType.getPrecision(), decimalType.getScale());
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return TimestampType;
+            case ARRAY:
+                return createArrayType(paimonType2SparkType(((ArrayType) 
type).getElementType()));
+            case MAP:
+            case MULTISET:
+                DataType keyType;
+                DataType valueType;
+                if (type instanceof MapType) {
+                    keyType = ((MapType) type).getKeyType();
+                    valueType = ((MapType) type).getValueType();
+                } else if (type instanceof MultisetType) {
+                    keyType = ((MultisetType) type).getElementType();
+                    valueType = new IntType();
+                } else {
+                    throw new UnsupportedOperationException("Unsupported type: 
" + type);
+                }
+                return DataTypes.createMapType(
+                        paimonType2SparkType(keyType), 
paimonType2SparkType(valueType));
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
type);
+        }
+    }
+
+    public static String paimonType2JavaType(org.apache.paimon.types.DataType 
type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+                return "java.lang.String";
+            case BOOLEAN:
+                return "java.lang.Boolean";
+            case BINARY:
+            case VARBINARY:
+                return "byte[]";
+            case TINYINT:
+                return "java.lang.Byte";
+            case SMALLINT:
+                return "java.lang.Short";
+            case INTEGER:
+                return "java.lang.Integer";
+            case BIGINT:
+                return "java.lang.Long";
+            case FLOAT:
+                return "java.lang.Float";
+            case DOUBLE:
+                return "java.lang.Double";
+            case DECIMAL:
+                return "java.math.BigDecimal";
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return "java.time.Instant";
+            case ARRAY:
+                return paimonType2JavaType(((ArrayType) 
type).getElementType()) + "[]";
+            case MAP:
+            case MULTISET:
+                DataType keyType;
+                DataType valueType;
+                if (type instanceof MapType) {
+                    keyType = ((MapType) type).getKeyType();
+                    valueType = ((MapType) type).getValueType();
+                } else if (type instanceof MultisetType) {
+                    keyType = ((MultisetType) type).getElementType();
+                    valueType = new IntType();
+                } else {
+                    throw new UnsupportedOperationException("Unsupported type: 
" + type);
+                }
+                return String.format(
+                        "java.util.Map<%s,%s>",
+                        paimonType2JavaType(keyType), 
paimonType2JavaType(valueType));
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
type);
+        }
+    }
+
+    public static Object convertSparkJavaToPaimonJava(
+            org.apache.spark.sql.types.DataType sparkType, Object value) {
+        if (value == null) {
+            return null;
+        }
+        if (sparkType == StringType) {
+            return (String) value.toString();
+        } else if (sparkType == BooleanType) {
+            return (Boolean) value;
+        } else if (sparkType == BinaryType) {
+            return (byte[]) value;
+        } else if (sparkType == ByteType) {
+            return (byte) value;
+        } else if (sparkType == ShortType) {
+            return (short) value;
+        } else if (sparkType == IntegerType) {
+            return (Integer) value;
+        } else if (sparkType == LongType) {
+            return (Long) value;
+        } else if (sparkType == FloatType) {
+            return (Float) value;
+        } else if (sparkType == DoubleType) {
+            return (Double) value;
+        } else if (sparkType instanceof 
org.apache.spark.sql.types.DecimalType) {
+            return (java.math.BigDecimal) value;
+        } else if (sparkType instanceof org.apache.spark.sql.types.ArrayType) {
+            org.apache.spark.sql.types.ArrayType arrayType =
+                    (org.apache.spark.sql.types.ArrayType) sparkType;
+            List<Object> list = new ArrayList<>();
+            if (value instanceof GenericArrayData) {
+                GenericArrayData genericArray = (GenericArrayData) value;
+                Object[] array = genericArray.array();
+                for (Object elem : array) {
+                    
list.add(convertSparkJavaToPaimonJava(arrayType.elementType(), elem));
+                }
+                return list;
+            } else {
+                throw new IllegalArgumentException("Unexpected array type: " + 
value.getClass());
+            }
+        } else if (sparkType instanceof org.apache.spark.sql.types.MapType) {
+            org.apache.spark.sql.types.MapType mapType =
+                    (org.apache.spark.sql.types.MapType) sparkType;
+            if (value instanceof ArrayBasedMapData) {
+                ArrayBasedMapData arrayBasedMapData = (ArrayBasedMapData) 
value;
+                Map<Object, Object> sparkMap =
+                        ArrayBasedMapData.toJavaMap(
+                                arrayBasedMapData.keyArray().array(),
+                                arrayBasedMapData.valueArray().array());
+                Map<Object, Object> javaMap = new HashMap<>();
+                for (Map.Entry<Object, Object> entry : sparkMap.entrySet()) {
+                    Object key = 
convertSparkJavaToPaimonJava(mapType.keyType(), entry.getKey());
+                    Object val =
+                            convertSparkJavaToPaimonJava(mapType.valueType(), 
entry.getValue());
+                    javaMap.put(key, val);
+                }
+                return javaMap;
+            } else {
+                throw new IllegalArgumentException("Unexpected array type: " + 
value.getClass());
+            }
+        }
+
+        throw new IllegalArgumentException("Unsupported Spark data type: " + 
sparkType);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/utils/CatalogUtilsTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/utils/CatalogUtilsTest.java
new file mode 100644
index 0000000000..103b1783cd
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/utils/CatalogUtilsTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.utils;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.spark.sql.types.DataTypes.createArrayType;
+import static org.apache.spark.sql.types.DataTypes.createMapType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CatalogUtils}. */
+public class CatalogUtilsTest {
+    @Test
+    void paimonType2SparkType() {
+        assertThat(CatalogUtils.paimonType2SparkType(new DecimalType(2, 1)))
+                .isEqualTo(DataTypes.createDecimalType(2, 1));
+        assertThat(CatalogUtils.paimonType2SparkType(new ArrayType(new 
IntType())))
+                
.isEqualTo(createArrayType(org.apache.spark.sql.types.DataTypes.IntegerType));
+        assertThat(CatalogUtils.paimonType2SparkType(new MapType(new 
IntType(), new VarCharType())))
+                .isEqualTo(
+                        createMapType(
+                                
org.apache.spark.sql.types.DataTypes.IntegerType,
+                                DataTypes.StringType));
+    }
+
+    @Test
+    void paimonType2JavaType() {
+        assertThat(CatalogUtils.paimonType2JavaType(new DecimalType(2, 1)))
+                .isEqualTo("java.math.BigDecimal");
+        assertThat(CatalogUtils.paimonType2JavaType(new ArrayType(new 
IntType())))
+                .isEqualTo("java.lang.Integer[]");
+        assertThat(CatalogUtils.paimonType2JavaType(new MapType(new IntType(), 
new VarCharType())))
+                
.isEqualTo("java.util.Map<java.lang.Integer,java.lang.String>");
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 444fb1024d..488913f14a 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -41,8 +41,8 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class SparkCatalogWithHiveTest {
 
     private static TestHiveMetastore testHiveMetastore;
-
     private static final int PORT = 9087;
+    @TempDir java.nio.file.Path tempDir;
 
     @BeforeAll
     public static void startMetastore() {
@@ -55,7 +55,102 @@ public class SparkCatalogWithHiveTest {
         testHiveMetastore.stop();
     }
 
-    @TempDir java.nio.file.Path tempDir;
+    @Test
+    public void testCreateFormatTable() {
+        try (SparkSession spark = createSessionBuilder().getOrCreate()) {
+            spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+            spark.sql("USE spark_catalog.my_db1");
+
+            // test orc table
+
+            spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c 
STRING) USING orc");
+
+            assertThat(
+                            spark.sql("SHOW TABLES").collectAsList().stream()
+                                    .map(s -> s.get(1))
+                                    .map(Object::toString))
+                    .containsExactlyInAnyOrder("table_orc");
+
+            assertThat(
+                            spark.sql("EXPLAIN EXTENDED SELECT * from 
table_orc").collectAsList()
+                                    .stream()
+                                    .map(s -> s.get(0))
+                                    .map(Object::toString)
+                                    .filter(s -> s.contains("OrcScan"))
+                                    .count())
+                    .isGreaterThan(0);
+
+            // todo: There are some bugs with Spark CSV table's options. In 
Spark 3.x, both reading
+            // and
+            // writing using the default delimiter value ',' even if we 
specific it. In Spark 4.x,
+            // reading is correct, but writing is still incorrect, just skip 
setting it for now.
+            // test csv table
+
+            spark.sql(
+                    "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c 
STRING) USING csv OPTIONS ('field-delimiter' ',')");
+            spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, 
'2')").collect();
+            assertThat(spark.sql("DESCRIBE FORMATTED 
table_csv").collectAsList().toString())
+                    .contains("sep=,");
+            assertThat(
+                            spark.sql("SELECT * FROM 
table_csv").collectAsList().stream()
+                                    .map(Row::toString)
+                                    .collect(Collectors.toList()))
+                    .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
+
+            // test json table
+
+            spark.sql("CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c 
STRING) USING json");
+            spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
+            assertThat(
+                            spark.sql("SELECT * FROM 
table_json").collectAsList().stream()
+                                    .map(Row::toString)
+                                    .collect(Collectors.toList()))
+                    .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
+        }
+    }
+
+    @Test
+    public void testSpecifyHiveConfDirInGenericCatalog() {
+        try (SparkSession spark =
+                createSessionBuilder()
+                        
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
+                        .config(
+                                "spark.sql.catalog.spark_catalog",
+                                SparkGenericCatalog.class.getName())
+                        .getOrCreate()) {
+            assertThatThrownBy(() -> spark.sql("CREATE DATABASE my_db"))
+                    .rootCause()
+                    .isInstanceOf(FileNotFoundException.class)
+                    .hasMessageContaining("nonExistentPath");
+        }
+    }
+
+    @Test
+    public void testCreateExternalTable() {
+        try (SparkSession spark = createSessionBuilder().getOrCreate()) {
+            String warehousePath = 
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
+            spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
+            spark.sql("USE spark_catalog.test_db");
+
+            // create hive external table
+            spark.sql("CREATE EXTERNAL TABLE external_table (a INT, bb INT, c 
STRING)");
+
+            // drop hive external table
+            spark.sql("DROP TABLE external_table");
+
+            // file system table exists
+            assertThatCode(
+                            () ->
+                                    FileStoreTableFactory.create(
+                                            LocalFileIO.create(),
+                                            new Path(
+                                                    warehousePath,
+                                                    String.format(
+                                                            "%s.db/%s",
+                                                            "test_db", 
"external_table"))))
+                    .doesNotThrowAnyException();
+        }
+    }
 
     private SparkSession.Builder createSessionBuilder() {
         Path warehousePath = new Path("file:" + tempDir.toString());
@@ -75,102 +170,4 @@ public class SparkCatalogWithHiveTest {
                         
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
                 .master("local[2]");
     }
-
-    @Test
-    public void testCreateFormatTable() {
-        SparkSession spark = createSessionBuilder().getOrCreate();
-        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
-        spark.sql("USE spark_catalog.my_db1");
-
-        // test orc table
-
-        spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c 
STRING) USING orc");
-
-        assertThat(
-                        spark.sql("SHOW TABLES").collectAsList().stream()
-                                .map(s -> s.get(1))
-                                .map(Object::toString))
-                .containsExactlyInAnyOrder("table_orc");
-
-        assertThat(
-                        spark.sql("EXPLAIN EXTENDED SELECT * from 
table_orc").collectAsList()
-                                .stream()
-                                .map(s -> s.get(0))
-                                .map(Object::toString)
-                                .filter(s -> s.contains("OrcScan"))
-                                .count())
-                .isGreaterThan(0);
-
-        // todo: There are some bugs with Spark CSV table's options. In Spark 
3.x, both reading and
-        // writing using the default delimiter value ',' even if we specific 
it. In Spark 4.x,
-        // reading is correct, but writing is still incorrect, just skip 
setting it for now.
-        // test csv table
-
-        spark.sql(
-                "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c 
STRING) USING csv OPTIONS ('field-delimiter' ',')");
-        spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, 
'2')").collect();
-        assertThat(spark.sql("DESCRIBE FORMATTED 
table_csv").collectAsList().toString())
-                .contains("sep=,");
-        assertThat(
-                        spark.sql("SELECT * FROM 
table_csv").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
-        // test json table
-
-        spark.sql("CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c 
STRING) USING json");
-        spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
-        assertThat(
-                        spark.sql("SELECT * FROM 
table_json").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
-        spark.close();
-    }
-
-    @Test
-    public void testSpecifyHiveConfDirInGenericCatalog() {
-        SparkSession spark =
-                createSessionBuilder()
-                        
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
-                        .config(
-                                "spark.sql.catalog.spark_catalog",
-                                SparkGenericCatalog.class.getName())
-                        .getOrCreate();
-        assertThatThrownBy(() -> spark.sql("CREATE DATABASE my_db"))
-                .rootCause()
-                .isInstanceOf(FileNotFoundException.class)
-                .hasMessageContaining("nonExistentPath");
-
-        spark.close();
-    }
-
-    @Test
-    public void testCreateExternalTable() {
-        SparkSession spark = createSessionBuilder().getOrCreate();
-        String warehousePath = 
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
-        spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
-        spark.sql("USE spark_catalog.test_db");
-
-        // create hive external table
-        spark.sql("CREATE EXTERNAL TABLE external_table (a INT, bb INT, c 
STRING)");
-
-        // drop hive external table
-        spark.sql("DROP TABLE external_table");
-
-        // file system table exists
-        assertThatCode(
-                        () ->
-                                FileStoreTableFactory.create(
-                                        LocalFileIO.create(),
-                                        new Path(
-                                                warehousePath,
-                                                String.format(
-                                                        "%s.db/%s", "test_db", 
"external_table"))))
-                .doesNotThrowAnyException();
-
-        spark.close();
-    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index 4502ba0122..575eb72711 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -18,6 +18,12 @@
 
 package org.apache.paimon.spark;
 
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.function.Function;
+import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.function.FunctionDefinition;
+import org.apache.paimon.function.FunctionImpl;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.rest.RESTCatalogInternalOptions;
 import org.apache.paimon.rest.RESTCatalogServer;
@@ -25,16 +31,24 @@ import org.apache.paimon.rest.auth.AuthProvider;
 import org.apache.paimon.rest.auth.AuthProviderEnum;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
 import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.CatalogManager;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -48,6 +62,7 @@ public class SparkCatalogWithRestTest {
     private String warehouse;
     @TempDir java.nio.file.Path tempFile;
     private String initToken = "init_token";
+    private SparkSession spark;
 
     @BeforeEach
     public void before() throws IOException {
@@ -65,16 +80,7 @@ public class SparkCatalogWithRestTest {
         restCatalogServer = new RESTCatalogServer(dataPath, authProvider, 
config, warehouse);
         restCatalogServer.start();
         serverUrl = restCatalogServer.getUrl();
-    }
-
-    @AfterEach()
-    public void after() throws Exception {
-        restCatalogServer.shutdown();
-    }
-
-    @Test
-    public void testTable() {
-        SparkSession spark =
+        spark =
                 SparkSession.builder()
                         .config("spark.sql.catalog.paimon", 
SparkCatalog.class.getName())
                         .config("spark.sql.catalog.paimon.metastore", "rest")
@@ -89,9 +95,18 @@ public class SparkCatalogWithRestTest {
                                 
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
                         .master("local[2]")
                         .getOrCreate();
-
         spark.sql("CREATE DATABASE paimon.db2");
         spark.sql("USE paimon.db2");
+    }
+
+    @AfterEach()
+    public void after() throws Exception {
+        restCatalogServer.shutdown();
+        spark.close();
+    }
+
+    @Test
+    public void testTable() {
         spark.sql(
                 "CREATE TABLE t1 (a INT, b INT, c STRING) TBLPROPERTIES"
                         + " ('primary-key'='a', 'bucket'='4', 
'file.format'='avro')");
@@ -102,6 +117,141 @@ public class SparkCatalogWithRestTest {
                 .containsExactlyInAnyOrder("t1");
         spark.sql("DROP TABLE t1");
         assertThat(spark.sql("SHOW TABLES").collectAsList().size() == 0);
-        spark.close();
+    }
+
+    @Test
+    public void testFunction() throws Exception {
+        List<DataField> inputParams = new ArrayList<>();
+        Catalog paimonCatalog = getPaimonCatalog();
+        inputParams.add(new DataField(0, "length", DataTypes.INT()));
+        inputParams.add(new DataField(1, "width", DataTypes.INT()));
+        List<DataField> returnParams = new ArrayList<>();
+        returnParams.add(new DataField(0, "area", DataTypes.BIGINT()));
+        String functionName = "area_func";
+        FunctionDefinition definition =
+                FunctionDefinition.lambda(
+                        "(Integer length, Integer width) -> { return (long) 
length * width; }",
+                        "JAVA");
+        Identifier identifier = Identifier.create("db2", functionName);
+        Function function =
+                new FunctionImpl(
+                        identifier,
+                        inputParams,
+                        returnParams,
+                        false,
+                        ImmutableMap.of(SparkCatalog.FUNCTION_DEFINITION_NAME, 
definition),
+                        null,
+                        null);
+        paimonCatalog.createFunction(identifier, function, false);
+        assertThat(
+                        spark.sql(String.format("select paimon.db2.%s(1, 2)", 
functionName))
+                                .collectAsList()
+                                .get(0)
+                                .toString())
+                .isEqualTo("[2]");
+        definition =
+                FunctionDefinition.lambda(
+                        "(Integer length, Integer width) -> { return length * 
width + 1L; }",
+                        "JAVA");
+        paimonCatalog.alterFunction(
+                identifier,
+                ImmutableList.of(
+                        FunctionChange.updateDefinition(
+                                SparkCatalog.FUNCTION_DEFINITION_NAME, 
definition)),
+                false);
+        assertThat(
+                        spark.sql(String.format("select paimon.db2.%s(1, 2)", 
functionName))
+                                .collectAsList()
+                                .get(0)
+                                .toString())
+                .isEqualTo("[3]");
+        paimonCatalog.dropFunction(identifier, false);
+        cleanFunction(functionName);
+    }
+
+    @Test
+    public void testArrayFunction() throws Exception {
+        List<DataField> inputParams = new ArrayList<>();
+        Catalog paimonCatalog = getPaimonCatalog();
+        inputParams.add(new DataField(0, "x", 
DataTypes.ARRAY(DataTypes.INT())));
+        List<DataField> returnParams = new ArrayList<>();
+        returnParams.add(new DataField(0, "y", DataTypes.INT()));
+        String functionName = "test";
+        Identifier identifier = Identifier.create("db2", functionName);
+        FunctionDefinition definition =
+                FunctionDefinition.lambda(
+                        "(java.util.List<java.util.List<Integer>> x) -> 
x.size()", "JAVA");
+        Function function =
+                new FunctionImpl(
+                        identifier,
+                        inputParams,
+                        returnParams,
+                        false,
+                        ImmutableMap.of(SparkCatalog.FUNCTION_DEFINITION_NAME, 
definition),
+                        null,
+                        null);
+        paimonCatalog.createFunction(identifier, function, false);
+        assertThat(
+                        spark.sql(
+                                        String.format(
+                                                "select 
paimon.db2.%s(array(array(1, 2, 3), array(1, 2, 3)))",
+                                                functionName))
+                                .collectAsList()
+                                .get(0)
+                                .toString())
+                .isEqualTo("[2]");
+        cleanFunction(functionName);
+    }
+
+    @Test
+    public void testMapFunction() throws Exception {
+        List<DataField> inputParams = new ArrayList<>();
+        Catalog paimonCatalog = getPaimonCatalog();
+        inputParams.add(new DataField(0, "x", DataTypes.MAP(DataTypes.INT(), 
DataTypes.INT())));
+        List<DataField> returnParams = new ArrayList<>();
+        returnParams.add(new DataField(0, "y", DataTypes.INT()));
+        String functionName = "test";
+        Identifier identifier = Identifier.create("db2", functionName);
+        FunctionDefinition definition =
+                FunctionDefinition.lambda(
+                        "(java.util.Map<Integer, Integer> x) -> x.size()", 
"JAVA");
+        Function function =
+                new FunctionImpl(
+                        identifier,
+                        inputParams,
+                        returnParams,
+                        false,
+                        ImmutableMap.of(SparkCatalog.FUNCTION_DEFINITION_NAME, 
definition),
+                        null,
+                        null);
+        paimonCatalog.createFunction(identifier, function, false);
+        assertThat(
+                        spark.sql(String.format("select paimon.db2.%s(map(1, 
2))", functionName))
+                                .collectAsList()
+                                .get(0)
+                                .toString())
+                .isEqualTo("[1]");
+        cleanFunction(functionName);
+    }
+
+    private Catalog getPaimonCatalog() {
+        CatalogManager catalogManager = spark.sessionState().catalogManager();
+        WithPaimonCatalog withPaimonCatalog = (WithPaimonCatalog) 
catalogManager.currentCatalog();
+        return withPaimonCatalog.paimonCatalog();
+    }
+
+    private void cleanFunction(String functionName) {
+        String functionFilePath =
+                SparkCatalogWithRestTest.class
+                        .getProtectionDomain()
+                        .getCodeSource()
+                        .getLocation()
+                        .getPath()
+                        .toString()
+                        .replaceAll(
+                                "target/test-classes/",
+                                
JavaLambdaStringToMethodConverter.getSourceFileName(functionName));
+        File file = new File(functionFilePath);
+        file.delete();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
new file mode 100644
index 0000000000..b52022b89a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+/** Test for Function Procedure. */
+class FunctionProcedureTest extends PaimonRestCatalogSparkTestBase {
+  test(s"test function procedure") {
+    val functionName = "function_test"
+    checkAnswer(
+      spark.sql(s"CALL sys.create_function('$functionName', " +
+        "'[{\"id\": 0, \"name\":\"length\", \"type\":\"INT\"}, {\"id\": 1, 
\"name\":\"width\", \"type\":\"INT\"}]'," +
+        " '[{\"id\": 0, \"name\":\"area\", \"type\":\"BIGINT\"}]', true, 
'comment', 'k1=v1,k2=v2')"),
+      Row(true)
+    );
+    assertThat(
+      spark
+        .sql("SHOW FUNCTIONS")
+        .collect()
+        .map(r => r.getString(0))
+        .filter(v => v == s"paimon.test.$functionName")
+        .size).isEqualTo(1)
+    checkAnswer(
+      spark.sql(s"CALL sys.alter_function('$functionName', " +
+        "'{\"action\" : \"addDefinition\", \"name\" : \"spark\", 
\"definition\" : {\"type\" : \"lambda\", \"definition\" : \"(Integer length, 
Integer width) -> { return (long) length * width; }\", \"language\": \"JAVA\" } 
}')"),
+      Row(true)
+    );
+    checkAnswer(
+      spark.sql(s"select paimon.test.$functionName(1, 2)"),
+      Row(2)
+    );
+    checkAnswer(
+      spark.sql(s"CALL sys.drop_function('$functionName')"),
+      Row(true)
+    );
+    assertThat(
+      spark
+        .sql("SHOW FUNCTIONS")
+        .collect()
+        .map(r => r.getString(0))
+        .filter(v => v == s"paimon.test.$functionName")
+        .size).isEqualTo(0);
+  }
+}

Reply via email to