This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 397b67353b [spark] Add v1 function support to SparkCatalog (#6075)
397b67353b is described below

commit 397b67353bc520aee25b524da38fbdf3b031beac
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 15 09:13:56 2025 +0800

    [spark] Add v1 function support to SparkCatalog (#6075)
---
 .gitignore                                         |   1 +
 docs/content/concepts/functions.md                 |  35 +---
 docs/content/spark/sql-functions.md                |  71 +++++++-
 .../generated/spark_catalog_configuration.html     |   6 +
 .../java/org/apache/paimon/function/Function.java  |   3 +
 .../org/apache/paimon/function/FunctionImpl.java   |  49 +++++-
 paimon-spark/paimon-spark-3.2/pom.xml              |  18 ++
 .../extensions/RewritePaimonFunctionCommands.scala |  32 +---
 paimon-spark/paimon-spark-3.3/pom.xml              |  19 +++
 .../extensions/RewritePaimonFunctionCommands.scala |  32 +---
 paimon-spark/paimon-spark-3.4/pom.xml              |  19 +++
 .../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
 .../paimon/spark/sql/PaimonV1FunctionTest.scala    |  30 +---
 paimon-spark/paimon-spark-3.5/pom.xml              |  19 +++
 .../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
 .../paimon/spark/sql/PaimonV1FunctionTest.scala    |  30 +---
 paimon-spark/paimon-spark-4.0/pom.xml              |  19 +++
 .../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
 .../paimon/spark/sql/PaimonV1FunctionTest.scala    |  30 +---
 paimon-spark/paimon-spark-common/pom.xml           |  12 ++
 .../java/org/apache/paimon/spark/SparkCatalog.java |  85 +++++++++-
 .../apache/paimon/spark/SparkCatalogOptions.java   |   6 +
 .../paimon/spark/catalog/SupportV1Function.java    |  51 ++++++
 .../spark/catalog/functions/PaimonFunctions.scala  |   4 +-
 .../catalog/functions/V1FunctionConverter.scala    |  81 +++++++++
 .../catalog/functions/V1FunctionRegistry.scala     | 177 ++++++++++++++++++++
 .../catalyst/analysis/PaimonFunctionResolver.scala |  45 +++++
 .../spark/execution/PaimonFunctionExec.scala       | 101 +++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |   3 +-
 .../org/apache/paimon/spark/util/OptionUtils.scala |   6 +-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |   8 +
 .../AbstractPaimonSparkSqlExtensionsParser.scala   |  16 +-
 .../extensions/RewritePaimonFunctionCommands.scala | 174 +++++++++++++++++++
 .../apache/spark/sql/paimon/shims/ClassicApi.scala |   2 +
 .../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
 ...la => PaimonSparkTestWithRestCatalogBase.scala} |   8 +-
 .../procedure/AlterViewDialectProcedureTest.scala  |   5 +-
 .../spark/procedure/FunctionProcedureTest.scala    |  17 +-
 .../spark/sql/PaimonV1FunctionTestBase.scala       | 184 +++++++++++++++++++++
 .../spark/sql/paimon/shims/Classic3Api.scala       |   5 +
 .../spark/sql/paimon/shims/Classic4Api.scala       |   6 +
 paimon-spark/pom.xml                               |  15 ++
 42 files changed, 1215 insertions(+), 209 deletions(-)

diff --git a/.gitignore b/.gitignore
index c4c2d54fe9..50d63fc69f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ target
 *.iml
 *.swp
 *.jar
+!**/resources/**/*.jar
 *.log
 *.pyc
 *.ipr
diff --git a/docs/content/concepts/functions.md 
b/docs/content/concepts/functions.md
index 11d05f590b..e66cd07762 100644
--- a/docs/content/concepts/functions.md
+++ b/docs/content/concepts/functions.md
@@ -84,37 +84,6 @@ DROP FUNCTION mydb.parse_str;
 
 This statement deletes the existing `parse_str` function from the `mydb` 
database, relinquishing its functionality.
 
-## Lambda Function Usage in Spark
+## Functions in Spark
 
-### Create Function
-
-```sql
--- Spark SQL
-CALL sys.create_function(`function` => 'my_db.area_func',
-  `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, 
"name":"width", "type":"INT"}]',
-  `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
-  `deterministic` => true,
-  `comment` => 'comment',
-  `options` => 'k1=v1,k2=v2'
-);
-```
-
-### Alter Function
-
-```sql
--- Spark SQL
-CALL sys.alter_function(`function` => 'my_db.area_func',
-  `change` => '{"action" : "addDefinition", "name" : "spark", "definition" : 
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return 
(long) length * width; }", "language": "JAVA" } }'
-);
-```
-```sql
--- Spark SQL
-select paimon.my_db.area_func(1, 2);
-```
-
-### Drop Function
-
-```sql
--- Spark SQL
-CALL sys.drop_function(`function` => 'my_db.area_func');
-```
+see [SQL Functions]({{< ref "spark/sql-functions#user-defined-function" >}})
\ No newline at end of file
diff --git a/docs/content/spark/sql-functions.md 
b/docs/content/spark/sql-functions.md
index eda7eb0654..3c280c30ff 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -29,7 +29,9 @@ under the License.
 This section introduce all available Paimon Spark functions.
 
 
-## max_pt
+## Built-in Function
+
+### max_pt
 
 `max_pt($table_name)`
 
@@ -44,12 +46,69 @@ It would throw exception when:
 
 **Example**
 
-```shell
-> SELECT max_pt('t');
- 20250101
+```sql
+SELECT max_pt('t');
+-- 20250101
  
-> SELECT * FROM t where pt = max_pt('t');
- a, 20250101
+SELECT * FROM t where pt = max_pt('t');
+-- a, 20250101
 ```
 
 **Since: 1.1.0**
+
+## User-defined Function
+
+Paimon Spark supports two types of user-defined functions: lambda functions 
and file-based functions.
+
+This feature currently only supports the REST catalog.
+
+### Lambda Function
+
+Empowering users to define functions using Java lambda expressions, enabling 
inline, concise, and functional-style operations.
+
+**Example**
+
+```sql
+-- Create Function
+CALL sys.create_function(`function` => 'my_db.area_func',
+  `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1, 
"name":"width", "type":"INT"}]',
+  `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
+  `deterministic` => true,
+  `comment` => 'comment',
+  `options` => 'k1=v1,k2=v2'
+);
+
+-- Alter Function
+CALL sys.alter_function(`function` => 'my_db.area_func',
+  `change` => '{"action" : "addDefinition", "name" : "spark", "definition" : 
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return 
(long) length * width; }", "language": "JAVA" } }'
+);
+
+-- Drop Function
+CALL sys.drop_function(`function` => 'my_db.area_func');
+```
+
+### File Function
+
+Users can define functions within a file, providing flexibility and modular 
support for function definition, only supports jar files now.
+
+This feature requires Spark 3.4 or higher.
+
+**Example**
+
+```sql
+-- Create Function
+CREATE FUNCTION mydb.simple_udf
+AS 'com.example.SimpleUdf' 
+USING JAR '/tmp/SimpleUdf.jar' [, JAR '/tmp/SimpleUdfR.jar'];
+
+-- Create or Replace Function
+CREATE OR REPLACE FUNCTION mydb.simple_udf 
+AS 'com.example.SimpleUdf'
+USING JAR '/tmp/SimpleUdf.jar';
+       
+-- Describe Function
+DESCRIBE FUNCTION [EXTENDED] mydb.simple_udf;
+
+-- Drop Function
+DROP FUNCTION mydb.simple_udf;
+```
diff --git a/docs/layouts/shortcodes/generated/spark_catalog_configuration.html 
b/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
index a3431959a1..c6d22093de 100644
--- a/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
@@ -38,5 +38,11 @@ under the License.
             <td>String</td>
             <td>The default database name.</td>
         </tr>
+        <tr>
+            <td><h5>v1Function.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable v1 function.</td>
+        </tr>
     </tbody>
 </table>
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java 
b/paimon-api/src/main/java/org/apache/paimon/function/Function.java
index b5d01e63c1..34a5b7e2ac 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ b/paimon-api/src/main/java/org/apache/paimon/function/Function.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.function;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.types.DataField;
 
 import java.util.List;
@@ -31,6 +32,8 @@ public interface Function {
 
     String fullName();
 
+    Identifier identifier();
+
     Optional<List<DataField>> inputParams();
 
     Optional<List<DataField>> returnParams();
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java 
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
index f617aa3a97..44b2e2246c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
+++ b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
@@ -21,8 +21,12 @@ package org.apache.paimon.function;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.types.DataField;
 
+import javax.annotation.Nullable;
+
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 /** Function implementation. */
@@ -30,25 +34,25 @@ public class FunctionImpl implements Function {
 
     private final Identifier identifier;
 
-    private final List<DataField> inputParams;
+    @Nullable private final List<DataField> inputParams;
 
-    private final List<DataField> returnParams;
+    @Nullable private final List<DataField> returnParams;
 
     private final boolean deterministic;
 
     private final Map<String, FunctionDefinition> definitions;
 
-    private final String comment;
+    @Nullable private final String comment;
 
     private final Map<String, String> options;
 
     public FunctionImpl(
             Identifier identifier,
-            List<DataField> inputParams,
-            List<DataField> returnParams,
+            @Nullable List<DataField> inputParams,
+            @Nullable List<DataField> returnParams,
             boolean deterministic,
             Map<String, FunctionDefinition> definitions,
-            String comment,
+            @Nullable String comment,
             Map<String, String> options) {
         this.identifier = identifier;
         this.inputParams = inputParams;
@@ -66,7 +70,7 @@ public class FunctionImpl implements Function {
         this.deterministic = true;
         this.definitions = definitions;
         this.comment = null;
-        this.options = null;
+        this.options = Collections.emptyMap();
     }
 
     @Override
@@ -79,6 +83,10 @@ public class FunctionImpl implements Function {
         return identifier.getFullName();
     }
 
+    public Identifier identifier() {
+        return identifier;
+    }
+
     @Override
     public Optional<List<DataField>> inputParams() {
         return Optional.ofNullable(inputParams);
@@ -113,4 +121,31 @@ public class FunctionImpl implements Function {
     public Map<String, String> options() {
         return options;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FunctionImpl function = (FunctionImpl) o;
+        return deterministic == function.deterministic
+                && Objects.equals(identifier, function.identifier)
+                && Objects.equals(inputParams, function.inputParams)
+                && Objects.equals(returnParams, function.returnParams)
+                && Objects.equals(definitions, function.definitions)
+                && Objects.equals(comment, function.comment)
+                && Objects.equals(options, function.options);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                identifier,
+                inputParams,
+                returnParams,
+                deterministic,
+                definitions,
+                comment,
+                options);
+    }
 }
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml 
b/paimon-spark/paimon-spark-3.2/pom.xml
index 11b6e61ff1..cc62cde3b3 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -61,6 +61,18 @@ under the License.
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
@@ -114,6 +126,12 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
similarity index 60%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index b5d01e63c1..4698e48a09 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -16,32 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.function;
+package org.apache.spark.sql.catalyst.parser.extensions
 
-import org.apache.paimon.types.DataField;
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+case class RewritePaimonFunctionCommands(spark: SparkSession) extends 
Rule[LogicalPlan] {
 
-/** Interface for function. */
-public interface Function {
-
-    String name();
-
-    String fullName();
-
-    Optional<List<DataField>> inputParams();
-
-    Optional<List<DataField>> returnParams();
-
-    boolean isDeterministic();
-
-    Map<String, FunctionDefinition> definitions();
-
-    FunctionDefinition definition(String name);
-
-    String comment();
-
-    Map<String, String> options();
+  // do nothing
+  override def apply(plan: LogicalPlan): LogicalPlan = plan
 }
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml 
b/paimon-spark/paimon-spark-3.3/pom.xml
index 4ced2186cb..22e4cf269c 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -61,6 +61,18 @@ under the License.
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
@@ -113,6 +125,13 @@ under the License.
             <version>${spark.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
similarity index 60%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to 
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index b5d01e63c1..4698e48a09 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -16,32 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.function;
+package org.apache.spark.sql.catalyst.parser.extensions
 
-import org.apache.paimon.types.DataField;
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+case class RewritePaimonFunctionCommands(spark: SparkSession) extends 
Rule[LogicalPlan] {
 
-/** Interface for function. */
-public interface Function {
-
-    String name();
-
-    String fullName();
-
-    Optional<List<DataField>> inputParams();
-
-    Optional<List<DataField>> returnParams();
-
-    boolean isDeterministic();
-
-    Map<String, FunctionDefinition> definitions();
-
-    FunctionDefinition definition(String name);
-
-    String comment();
-
-    Map<String, String> options();
+  // do nothing
+  override def apply(plan: LogicalPlan): LogicalPlan = plan
 }
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml 
b/paimon-spark/paimon-spark-3.4/pom.xml
index 2a9b72915a..14d39806e7 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -60,6 +60,18 @@ under the License.
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
@@ -112,6 +124,13 @@ under the License.
             <version>${spark.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/resources/function/hive-test-udfs.jar 
b/paimon-spark/paimon-spark-3.4/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and 
b/paimon-spark/paimon-spark-3.4/src/test/resources/function/hive-test-udfs.jar 
differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
similarity index 59%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to 
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
index b5d01e63c1..f37fbad270 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -16,32 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.function;
+package org.apache.paimon.spark.sql
 
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Interface for function. */
-public interface Function {
-
-    String name();
-
-    String fullName();
-
-    Optional<List<DataField>> inputParams();
-
-    Optional<List<DataField>> returnParams();
-
-    boolean isDeterministic();
-
-    Map<String, FunctionDefinition> definitions();
-
-    FunctionDefinition definition(String name);
-
-    String comment();
-
-    Map<String, String> options();
-}
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-3.5/pom.xml 
b/paimon-spark/paimon-spark-3.5/pom.xml
index 4be60e17a8..ee87e3e8da 100644
--- a/paimon-spark/paimon-spark-3.5/pom.xml
+++ b/paimon-spark/paimon-spark-3.5/pom.xml
@@ -60,6 +60,18 @@ under the License.
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
@@ -139,6 +151,13 @@ under the License.
             </exclusions>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/resources/function/hive-test-udfs.jar 
b/paimon-spark/paimon-spark-3.5/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and 
b/paimon-spark/paimon-spark-3.5/src/test/resources/function/hive-test-udfs.jar 
differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
similarity index 59%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to 
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
index b5d01e63c1..f37fbad270 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -16,32 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.function;
+package org.apache.paimon.spark.sql
 
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Interface for function. */
-public interface Function {
-
-    String name();
-
-    String fullName();
-
-    Optional<List<DataField>> inputParams();
-
-    Optional<List<DataField>> returnParams();
-
-    boolean isDeterministic();
-
-    Map<String, FunctionDefinition> definitions();
-
-    FunctionDefinition definition(String name);
-
-    String comment();
-
-    Map<String, String> options();
-}
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-4.0/pom.xml 
b/paimon-spark/paimon-spark-4.0/pom.xml
index 2087136d87..50158bbeb6 100644
--- a/paimon-spark/paimon-spark-4.0/pom.xml
+++ b/paimon-spark/paimon-spark-4.0/pom.xml
@@ -60,6 +60,18 @@ under the License.
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
@@ -111,6 +123,13 @@ under the License.
             <version>${spark.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/resources/function/hive-test-udfs.jar 
b/paimon-spark/paimon-spark-4.0/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and 
b/paimon-spark/paimon-spark-4.0/src/test/resources/function/hive-test-udfs.jar 
differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
similarity index 59%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to 
paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
index b5d01e63c1..f37fbad270 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -16,32 +16,6 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.function;
+package org.apache.paimon.spark.sql
 
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Interface for function. */
-public interface Function {
-
-    String name();
-
-    String fullName();
-
-    Optional<List<DataField>> inputParams();
-
-    Optional<List<DataField>> returnParams();
-
-    boolean isDeterministic();
-
-    Map<String, FunctionDefinition> definitions();
-
-    FunctionDefinition definition(String name);
-
-    String comment();
-
-    Map<String, String> options();
-}
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-common/pom.xml 
b/paimon-spark/paimon-spark-common/pom.xml
index 20cc34ff63..ad352e32d3 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -50,6 +50,18 @@ under the License.
             <version>${spark.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.antlr</groupId>
             <artifactId>antlr4-runtime</artifactId>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 1f32e04903..53dc85eb44 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -22,16 +22,21 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.DelegateCatalog;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.spark.catalog.FormatTableCatalog;
 import org.apache.paimon.spark.catalog.SparkBaseCatalog;
+import org.apache.paimon.spark.catalog.SupportV1Function;
 import org.apache.paimon.spark.catalog.SupportView;
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
+import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
+import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry;
 import org.apache.paimon.spark.utils.CatalogUtils;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.FormatTableOptions;
@@ -42,11 +47,14 @@ import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.spark.sql.PaimonSparkSession$;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
+import org.apache.spark.sql.catalyst.expressions.Expression;
 import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
@@ -73,6 +81,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -81,6 +91,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import scala.Option;
+import scala.collection.Seq;
+
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
@@ -98,29 +111,41 @@ import static 
org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultVa
 
 /** Spark {@link TableCatalog} for paimon. */
 public class SparkCatalog extends SparkBaseCatalog
-        implements SupportView, FunctionCatalog, SupportsNamespaces, 
FormatTableCatalog {
+        implements SupportView,
+                SupportV1Function,
+                FunctionCatalog,
+                SupportsNamespaces,
+                FormatTableCatalog {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
 
     public static final String FUNCTION_DEFINITION_NAME = "spark";
     private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
 
-    protected Catalog catalog = null;
-
+    private Catalog catalog;
     private String defaultDatabase;
+    private boolean v1FunctionEnabled;
+    @Nullable private V1FunctionRegistry v1FunctionRegistry;
 
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
         checkRequiredConfigurations();
-
+        SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
         this.catalogName = name;
         CatalogContext catalogContext =
                 CatalogContext.create(
-                        Options.fromMap(options),
-                        
PaimonSparkSession$.MODULE$.active().sessionState().newHadoopConf());
+                        Options.fromMap(options), 
sparkSession.sessionState().newHadoopConf());
         this.catalog = CatalogFactory.createCatalog(catalogContext);
         this.defaultDatabase =
                 options.getOrDefault(DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE.defaultValue());
+        this.v1FunctionEnabled =
+                options.getBoolean(
+                                SparkCatalogOptions.V1FUNCTION_ENABLED.key(),
+                                
SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue())
+                        && DelegateCatalog.rootCatalog(catalog) instanceof 
RESTCatalog;
+        if (v1FunctionEnabled) {
+            this.v1FunctionRegistry = new V1FunctionRegistry(sparkSession);
+        }
         try {
             catalog.getDatabase(defaultDatabase);
         } catch (Catalog.DatabaseNotExistException e) {
@@ -616,9 +641,7 @@ public class SparkCatalog extends SparkBaseCatalog
                 Function paimonFunction = 
catalog.getFunction(toIdentifier(ident));
                 FunctionDefinition functionDefinition =
                         paimonFunction.definition(FUNCTION_DEFINITION_NAME);
-                if (functionDefinition != null
-                        && functionDefinition
-                                instanceof 
FunctionDefinition.LambdaFunctionDefinition) {
+                if (functionDefinition instanceof 
FunctionDefinition.LambdaFunctionDefinition) {
                     FunctionDefinition.LambdaFunctionDefinition 
lambdaFunctionDefinition =
                             (FunctionDefinition.LambdaFunctionDefinition) 
functionDefinition;
                     if (paimonFunction.returnParams().isPresent()) {
@@ -651,6 +674,50 @@ public class SparkCatalog extends SparkBaseCatalog
         return namespace.length == 0 || (namespace.length == 1 && 
namespaceExists(namespace));
     }
 
+    private V1FunctionRegistry v1FunctionRegistry() {
+        assert v1FunctionRegistry != null;
+        return v1FunctionRegistry;
+    }
+
+    @Override
+    public boolean v1FunctionEnabled() {
+        return v1FunctionEnabled;
+    }
+
+    @Override
+    public Function getV1Function(FunctionIdentifier funcIdent) throws 
Exception {
+        return 
paimonCatalog().getFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent));
+    }
+
+    @Override
+    public void createV1Function(CatalogFunction v1Function, boolean 
ignoreIfExists)
+            throws Exception {
+        Function paimonFunction = 
V1FunctionConverter.fromV1Function(v1Function);
+        paimonCatalog()
+                .createFunction(
+                        
V1FunctionConverter.fromFunctionIdentifier(v1Function.identifier()),
+                        paimonFunction,
+                        ignoreIfExists);
+    }
+
+    @Override
+    public boolean v1FunctionRegistered(FunctionIdentifier funcIdent) {
+        return v1FunctionRegistry().isRegistered(funcIdent);
+    }
+
+    @Override
+    public Expression registerAndResolveV1Function(
+            FunctionIdentifier funcIdent, Option<Function> func, 
Seq<Expression> arguments) {
+        return v1FunctionRegistry().registerAndResolveFunction(funcIdent, 
func, arguments.toSeq());
+    }
+
+    @Override
+    public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) 
throws Exception {
+        v1FunctionRegistry().unregisterFunction(funcIdent);
+        paimonCatalog()
+                
.dropFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent), ifExists);
+    }
+
     private PropertyChange toPropertyChange(NamespaceChange change) {
         if (change instanceof NamespaceChange.SetProperty) {
             NamespaceChange.SetProperty set = (NamespaceChange.SetProperty) 
change;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
index d990d4c864..f069e00d48 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
@@ -37,4 +37,10 @@ public class SparkCatalogOptions {
                     .stringType()
                     .defaultValue(Catalog.DEFAULT_DATABASE)
                     .withDescription("The default database name.");
+
+    public static final ConfigOption<Boolean> V1FUNCTION_ENABLED =
+            key("v1Function.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether to enable v1 function.");
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
new file mode 100644
index 0000000000..2506713ef4
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog;
+
+import org.apache.paimon.function.Function;
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+
+import scala.Option;
+import scala.collection.Seq;
+
+/** Catalog supports v1 function. */
+public interface SupportV1Function extends WithPaimonCatalog {
+
+    boolean v1FunctionEnabled();
+
+    /** Look up the function in the catalog. */
+    Function getV1Function(FunctionIdentifier funcIdent) throws Exception;
+
+    void createV1Function(CatalogFunction v1Function, boolean ignoreIfExists) 
throws Exception;
+
+    boolean v1FunctionRegistered(FunctionIdentifier funcIdent);
+
+    /**
+     * Register the function and resolves it to an Expression if not 
registered, otherwise returns
+     * the registered Expression.
+     */
+    Expression registerAndResolveV1Function(
+            FunctionIdentifier funcIdent, Option<Function> func, 
Seq<Expression> arguments);
+
+    /** Unregister the func first, then drop it. */
+    void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throws 
Exception;
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index f04edf88c4..2876f7e7f1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalog.functions
 import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.bucket
 import org.apache.paimon.data.serializer.InternalRowSerializer
-import 
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableList, 
ImmutableMap}
+import 
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap, 
ImmutableSet}
 import org.apache.paimon.spark.SparkInternalRowWrapper
 import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
@@ -61,7 +61,7 @@ object PaimonFunctions {
     MOD_BUCKET
   )
 
-  val names: ImmutableList[String] = FUNCTIONS.keySet.asList()
+  val names: ImmutableSet[String] = FUNCTIONS.keySet
 
   def bucketFunctionName(funcType: BucketFunctionType): String = 
TYPE_FUNC_MAPPING.get(funcType)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionConverter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionConverter.scala
new file mode 100644
index 0000000000..be4daa86b9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionConverter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog.functions
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.function.{Function, FunctionDefinition, FunctionImpl}
+import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
FunctionResource, FunctionResourceType}
+
+import scala.collection.JavaConverters._
+
+object V1FunctionConverter {
+
+  /** Converts spark [[FunctionIdentifier]] to paimon [[Identifier]]. */
+  def fromFunctionIdentifier(ident: FunctionIdentifier): Identifier = {
+    new Identifier(ident.database.get, ident.funcName)
+  }
+
+  /** Converts paimon [[Identifier]] to spark [[FunctionIdentifier]]. */
+  def toFunctionIdentifier(ident: Identifier): FunctionIdentifier = {
+    new FunctionIdentifier(ident.getObjectName, Some(ident.getDatabaseName))
+  }
+
+  /** Converts spark [[CatalogFunction]] to paimon [[Function]]. */
+  def fromV1Function(v1Function: CatalogFunction): Function = {
+    val functionIdentifier = v1Function.identifier
+    val identifier = fromFunctionIdentifier(functionIdentifier)
+    val fileResources = v1Function.resources
+      .map(r => new 
FunctionDefinition.FunctionFileResource(r.resourceType.resourceType, r.uri))
+      .toList
+
+    val functionDefinition: FunctionDefinition = FunctionDefinition.file(
+      fileResources.asJava,
+      "JAVA", // Apache Spark only supports JAR persistent function now.
+      v1Function.className,
+      functionIdentifier.funcName)
+    val definitions = Map(FUNCTION_DEFINITION_NAME -> 
functionDefinition).asJava
+
+    new FunctionImpl(identifier, definitions)
+  }
+
+  /** Converts paimon [[Function]] to spark [[CatalogFunction]]. */
+  def toV1Function(paimonFunction: Function): CatalogFunction = {
+    paimonFunction.definition(FUNCTION_DEFINITION_NAME) match {
+      case functionDefinition: FunctionDefinition.FileFunctionDefinition =>
+        val fileResources = functionDefinition
+          .fileResources()
+          .asScala
+          .map(r => 
FunctionResource(FunctionResourceType.fromString(r.resourceType()), r.uri()))
+          .toSeq
+
+        CatalogFunction(
+          new FunctionIdentifier(
+            paimonFunction.name(),
+            Some(paimonFunction.identifier().getDatabaseName)),
+          functionDefinition.className(),
+          fileResources)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported function 
definition $other")
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
new file mode 100644
index 0000000000..c8a6def184
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog.functions
+
+import org.apache.paimon.function.{Function => PaimonFunction}
+
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
FunctionRegistry, FunctionRegistryBase, SimpleFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
FunctionExpressionBuilder, FunctionResource, FunctionResourceLoader}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+import java.util.Locale
+
+case class V1FunctionRegistry(session: SparkSession) extends SQLConfHelper {
+
+  // ================== Start Public API ===================
+
+  /**
+   * Register the function and resolves it to an Expression if not registered, 
otherwise returns the
+   * registered Expression.
+   */
+  def registerAndResolveFunction(
+      funcIdent: FunctionIdentifier,
+      func: Option[PaimonFunction],
+      arguments: Seq[Expression]): Expression = {
+    resolvePersistentFunctionInternal(
+      funcIdent,
+      func,
+      arguments,
+      functionRegistry,
+      makeFunctionBuilder)
+  }
+
+  /** Check if the function is registered. */
+  def isRegistered(funcIdent: FunctionIdentifier): Boolean = {
+    val qualifiedIdent = qualifyIdentifier(funcIdent)
+    functionRegistry.functionExists(qualifiedIdent)
+  }
+
+  /** Unregister the function. */
+  def unregisterFunction(funcIdent: FunctionIdentifier): Unit = {
+    val qualifiedIdent = qualifyIdentifier(funcIdent)
+    if (functionRegistry.functionExists(qualifiedIdent)) {
+      functionRegistry.dropFunction(qualifiedIdent)
+    }
+  }
+
+  // ================== End Public API ===================
+
+  // Most copy from spark
+  private val functionResourceLoader: FunctionResourceLoader =
+    SparkShimLoader.shim.classicApi.sessionResourceLoader(session)
+  private val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry
+  private val functionExpressionBuilder: FunctionExpressionBuilder = 
HiveUDFExpressionBuilder
+
+  /** Look up a persistent scalar function by name and resolves it to an 
Expression. */
+  private def resolvePersistentFunctionInternal[T](
+      funcIdent: FunctionIdentifier,
+      func: Option[PaimonFunction],
+      arguments: Seq[Expression],
+      registry: FunctionRegistryBase[T],
+      createFunctionBuilder: CatalogFunction => 
FunctionRegistryBase[T]#FunctionBuilder): T = {
+
+    val name = funcIdent
+    // `synchronized` is used to prevent multiple threads from concurrently 
resolving the
+    // same function that has not yet been loaded into the function registry. 
This is needed
+    // because calling `registerFunction` twice with `overrideIfExists = 
false` can lead to
+    // a FunctionAlreadyExistsException.
+    synchronized {
+      val qualifiedIdent = qualifyIdentifier(name)
+      if (registry.functionExists(qualifiedIdent)) {
+        // This function has been already loaded into the function registry.
+        registry.lookupFunction(qualifiedIdent, arguments)
+      } else {
+        // The function has not been loaded to the function registry, which 
means
+        // that the function is a persistent function (if it actually has been 
registered
+        // in the metastore). We need to first put the function in the 
function registry.
+        require(func.isDefined, "Function must be defined")
+        val catalogFunction = V1FunctionConverter.toV1Function(func.get)
+        loadFunctionResources(catalogFunction.resources)
+        // Please note that qualifiedName is provided by the user. However,
+        // catalogFunction.identifier.unquotedString is returned by the 
underlying
+        // catalog. So, it is possible that qualifiedName is not exactly the 
same as
+        // catalogFunction.identifier.unquotedString (difference is on 
case-sensitivity).
+        // At here, we preserve the input from the user.
+        val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
+        registerFunction(
+          funcMetadata,
+          overrideIfExists = false,
+          registry = registry,
+          functionBuilder = createFunctionBuilder(funcMetadata))
+        // Now, we need to create the Expression.
+        registry.lookupFunction(qualifiedIdent, arguments)
+      }
+    }
+  }
+
+  /**
+   * Loads resources such as JARs and Files for a function. Every resource is 
represented by a tuple
+   * (resource type, resource uri).
+   */
+  private def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
+    resources.foreach(functionResourceLoader.loadResource)
+  }
+
+  private def registerFunction[T](
+      funcDefinition: CatalogFunction,
+      overrideIfExists: Boolean,
+      registry: FunctionRegistryBase[T],
+      functionBuilder: FunctionRegistryBase[T]#FunctionBuilder): Unit = {
+    val func = funcDefinition.identifier
+    if (registry.functionExists(func) && !overrideIfExists) {
+      throw new FunctionAlreadyExistsException(func.nameParts)
+    }
+    val info = makeExprInfoForHiveFunction(funcDefinition)
+    registry.registerFunction(func, info, functionBuilder)
+  }
+
+  private def makeExprInfoForHiveFunction(func: CatalogFunction): 
ExpressionInfo = {
+    new ExpressionInfo(
+      func.className,
+      func.identifier.database.orNull,
+      func.identifier.funcName,
+      null,
+      "",
+      "",
+      "",
+      "",
+      "",
+      "",
+      "hive")
+  }
+
+  /** Constructs a [[FunctionBuilder]] based on the provided function 
metadata. */
+  private def makeFunctionBuilder(func: CatalogFunction): FunctionBuilder = {
+    val className = func.className
+    if (!PaimonUtils.classIsLoadable(className)) {
+      throw new IllegalArgumentException(s"Cannot load class: $className")
+    }
+    val clazz = PaimonUtils.classForName(className)
+    val name = func.identifier.unquotedString
+    (input) => functionExpressionBuilder.makeExpression(name, clazz, input)
+  }
+
+  /**
+   * Qualifies the function identifier with the current database if not 
specified, and normalize all
+   * the names.
+   */
+  private def qualifyIdentifier(ident: FunctionIdentifier): FunctionIdentifier 
= {
+    FunctionIdentifier(funcName = format(ident.funcName), database = 
ident.database)
+  }
+
+  /** Formats object names, taking into account case sensitivity. */
+  protected def format(name: String): String = {
+    if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
new file mode 100644
index 0000000000..aa149e892a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.function.{Function => PaimonFunction}
+import org.apache.paimon.spark.catalog.SupportV1Function
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions.Expression
+import 
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_FUNCTION
+
+case class PaimonFunctionResolver() extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan =
+    
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+      case l: LogicalPlan =>
+        
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+          case UnResolvedPaimonV1Function(
+                v1FunctionCatalog: SupportV1Function,
+                funcIdent: FunctionIdentifier,
+                func: Option[PaimonFunction],
+                arguments: Seq[Expression]) =>
+            v1FunctionCatalog.registerAndResolveV1Function(funcIdent, func, 
arguments)
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
new file mode 100644
index 0000000000..0d0c7f1161
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.function.FunctionDefinition
+import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME
+import org.apache.paimon.spark.catalog.SupportV1Function
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.types.StringType
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class CreatePaimonV1FunctionCommand(
+    catalog: SupportV1Function,
+    v1Function: CatalogFunction,
+    ignoreIfExists: Boolean,
+    replace: Boolean)
+  extends PaimonLeafRunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    // Note: for replace just drop then create ,this operation is non-atomic.
+    if (replace) {
+      catalog.dropV1Function(v1Function.identifier, true)
+    }
+    catalog.createV1Function(v1Function, ignoreIfExists)
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"CreatePaimonV1FunctionCommand: ${v1Function.identifier}"
+  }
+}
+
+case class DropPaimonV1FunctionCommand(
+    catalog: SupportV1Function,
+    funcIdent: FunctionIdentifier,
+    ifExists: Boolean)
+  extends PaimonLeafRunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    catalog.dropV1Function(funcIdent, ifExists)
+    Nil
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"DropPaimonV1FunctionCommand: $funcIdent"
+  }
+}
+
+case class DescribePaimonV1FunctionCommand(
+    catalog: SupportV1Function,
+    funcIdent: FunctionIdentifier,
+    isExtended: Boolean)
+  extends PaimonLeafRunnableCommand {
+
+  override val output: Seq[Attribute] = {
+    Seq(AttributeReference("function_desc", StringType, nullable = false)())
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val function = catalog.getV1Function(funcIdent)
+    val rows = new ArrayBuffer[Row]()
+    function.definition(FUNCTION_DEFINITION_NAME) match {
+      case functionDefinition: FunctionDefinition.FileFunctionDefinition =>
+        rows += Row(s"Function: ${function.fullName()}")
+        rows += Row(s"Class: ${functionDefinition.className()}")
+        if (isExtended) {
+          rows += Row(
+            s"File Resources: 
${functionDefinition.fileResources().asScala.map(_.uri()).mkString(", ")}")
+        }
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported function 
definition $other")
+    }
+
+    rows.toSeq
+  }
+
+  override def simpleString(maxFields: Int): String = {
+    s"DescribePaimonV1FunctionCommand: $funcIdent"
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 9a7f62e6db..f1a93c26de 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.extensions
 
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, 
PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, 
PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
 import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueries}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.commands.BucketExpression
@@ -39,6 +39,7 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
     extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
     extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
+    extensions.injectResolutionRule(_ => PaimonFunctionResolver())
     extensions.injectResolutionRule(spark => 
SparkShimLoader.shim.createCustomResolution(spark))
     extensions.injectResolutionRule(spark => 
PaimonIncompatibleResolutionRules(spark))
     extensions.injectResolutionRule(spark => RewriteUpsertTable(spark))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 2acfc4665d..dd35dd394d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.util
 
 import org.apache.paimon.catalog.Identifier
 import org.apache.paimon.options.ConfigOption
-import org.apache.paimon.spark.SparkConnectorOptions
+import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions}
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -71,6 +71,10 @@ object OptionUtils extends SQLConfHelper {
     getOptionString(SparkConnectorOptions.EXPLICIT_CAST).toBoolean
   }
 
+  def v1FunctionEnabled(): Boolean = {
+    getOptionString(SparkCatalogOptions.V1FUNCTION_ENABLED).toBoolean
+  }
+
   private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, 
String] = {
     val mergedOptions = new JHashMap[String, String](
       conf.getAllConfs
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 885910dbc2..8f46f25518 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -126,4 +126,12 @@ object PaimonUtils {
   def sameType(left: DataType, right: DataType): Boolean = {
     left.sameType(right)
   }
+
+  def classIsLoadable(clazz: String): Boolean = {
+    SparkUtils.classIsLoadable(clazz)
+  }
+
+  def classForName(clazz: String): Class[_] = {
+    SparkUtils.classForName(clazz)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 73e1ea3ec8..2e84df8027 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
 import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{NonReservedContext,
 QuotedIdentifierContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.VariableSubstitution
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -65,10 +66,23 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
       parse(sqlTextAfterSubstitution)(parser => 
astBuilder.visit(parser.singleStatement()))
         .asInstanceOf[LogicalPlan]
     } else {
-      
RewritePaimonViewCommands(PaimonSparkSession.active).apply(delegate.parsePlan(sqlText))
+      var plan = delegate.parsePlan(sqlText)
+      val sparkSession = PaimonSparkSession.active
+      parserRules(sparkSession).foreach(
+        rule => {
+          plan = rule.apply(plan)
+        })
+      plan
     }
   }
 
+  private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] 
= {
+    Seq(
+      RewritePaimonViewCommands(sparkSession),
+      RewritePaimonFunctionCommands(sparkSession)
+    )
+  }
+
   /** Parses a string to an Expression. */
   override def parseExpression(sqlText: String): Expression =
     delegate.parseExpression(sqlText)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
new file mode 100644
index 0000000000..56c383f6d2
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.function.{Function => PaimonFunction}
+import org.apache.paimon.function.FunctionDefinition
+import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME
+import org.apache.paimon.spark.catalog.SupportV1Function
+import org.apache.paimon.spark.catalog.functions.PaimonFunctions
+import org.apache.paimon.spark.execution.{CreatePaimonV1FunctionCommand, 
DescribePaimonV1FunctionCommand, DropPaimonV1FunctionCommand}
+import org.apache.paimon.spark.util.OptionUtils
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, 
UnresolvedException, UnresolvedFunction, UnresolvedFunctionName, 
UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction, 
DescribeFunction, DropFunction, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, 
UNRESOLVED_FUNCTION}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
LookupCatalog}
+import org.apache.spark.sql.types.DataType
+
+case class RewritePaimonFunctionCommands(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with LookupCatalog {
+
+  protected lazy val catalogManager: CatalogManager = 
spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    // Add a global switch to enable/disable v1 function.
+    if (!OptionUtils.v1FunctionEnabled()) {
+      return plan
+    }
+
+    plan.resolveOperatorsUp {
+      case CreateFunction(
+            CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, 
funcIdent),
+            className,
+            resources,
+            ifExists,
+            replace) =>
+        if (isPaimonBuildInFunction(funcIdent)) {
+          throw new UnsupportedOperationException(s"Can't create build-in 
function: $funcIdent")
+        }
+        val v1Function = CatalogFunction(funcIdent, className, resources)
+        CreatePaimonV1FunctionCommand(v1FunctionCatalog, v1Function, ifExists, 
replace)
+
+      case DropFunction(
+            CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, 
funcIdent),
+            ifExists) =>
+        if (isPaimonBuildInFunction(funcIdent)) {
+          throw new UnsupportedOperationException(s"Can't drop build-in 
function: $funcIdent")
+        }
+        DropPaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, ifExists)
+
+      case DescribeFunction(
+            CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, 
funcIdent),
+            isExtended)
+          // For Paimon built-in functions, Spark will resolve them by itself.
+          if !isPaimonBuildInFunction(funcIdent) =>
+        DescribePaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, 
isExtended)
+
+      // Needs to be done here and transform to `UnResolvedPaimonV1Function`, 
so that spark's Analyzer can resolve
+      // the 'arguments' without throwing an exception, saying that function 
is not supported.
+      case l: LogicalPlan =>
+        
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+          case u: UnresolvedFunction =>
+            CatalogAndFunctionIdentifier.unapply(u.nameParts) match {
+              case Some((v1FunctionCatalog: SupportV1Function, funcIdent))
+                  // For Paimon built-in functions, Spark will resolve them by 
itself.
+                  if !isPaimonBuildInFunction(funcIdent) =>
+                // If the function is already registered, avoid redundant 
lookup in the catalog to reduce overhead.
+                if (v1FunctionCatalog.v1FunctionRegistered(funcIdent)) {
+                  UnResolvedPaimonV1Function(v1FunctionCatalog, funcIdent, 
None, u.arguments)
+                } else {
+                  val function = v1FunctionCatalog.getV1Function(funcIdent)
+                  function.definition(FUNCTION_DEFINITION_NAME) match {
+                    case _: FunctionDefinition.FileFunctionDefinition =>
+                      if (u.isDistinct && u.filter.isDefined) {
+                        throw new UnsupportedOperationException(
+                          s"DISTINCT with FILTER is not supported, func name: 
$funcIdent")
+                      }
+                      UnResolvedPaimonV1Function(
+                        v1FunctionCatalog,
+                        funcIdent,
+                        Some(function),
+                        u.arguments)
+                    case _ => u
+                  }
+                }
+              case _ => u
+            }
+        }
+    }
+  }
+
+  private object CatalogAndFunctionIdentifier {
+
+    def unapply(unresolved: LogicalPlan): Option[(CatalogPlugin, 
FunctionIdentifier)] =
+      unresolved match {
+        case ui: UnresolvedIdentifier =>
+          unapply(ui.nameParts)
+        case name: UnresolvedFunctionName =>
+          unapply(name.multipartIdentifier)
+        case _ =>
+          None
+      }
+
+    def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, 
FunctionIdentifier)] = {
+      nameParts match {
+        // Spark's built-in functions is without database name or catalog name.
+        case Seq(funName) if 
isSparkBuiltInFunction(FunctionIdentifier(funName)) =>
+          None
+        case CatalogAndIdentifier(v1FunctionCatalog: SupportV1Function, ident)
+            if v1FunctionCatalog.v1FunctionEnabled() =>
+          Some(v1FunctionCatalog, FunctionIdentifier(ident.name(), 
Some(ident.namespace().last)))
+        case _ =>
+          None
+      }
+    }
+  }
+
+  private def isPaimonBuildInFunction(funcIdent: FunctionIdentifier): Boolean 
= {
+    PaimonFunctions.names.contains(funcIdent.funcName)
+  }
+
+  private def isSparkBuiltInFunction(funcIdent: FunctionIdentifier): Boolean = 
{
+    FunctionRegistry.builtin.functionExists(funcIdent)
+  }
+}
+
+/** An unresolved Paimon V1 function to let Spark resolve the necessary 
variables. */
+case class UnResolvedPaimonV1Function(
+    v1FunctionCatalog: SupportV1Function,
+    funcIdent: FunctionIdentifier,
+    func: Option[PaimonFunction],
+    arguments: Seq[Expression])
+  extends Expression
+  with Unevaluable {
+
+  override def children: Seq[Expression] = arguments
+
+  override def dataType: DataType = throw new UnresolvedException("dataType")
+
+  override def nullable: Boolean = throw new UnresolvedException("nullable")
+
+  override lazy val resolved = false
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_FUNCTION)
+
+  override def prettyName: String = funcIdent.identifier
+
+  override protected def withNewChildrenInternal(
+      newChildren: IndexedSeq[Expression]): UnResolvedPaimonV1Function = {
+    copy(arguments = newChildren)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
index 21381cca29..d320def940 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
@@ -19,6 +19,7 @@
 package org.apache.spark.sql.paimon.shims
 
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.FunctionResourceLoader
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.execution.SparkPlan
@@ -42,4 +43,5 @@ trait ClassicApi {
       relation: LogicalPlan,
       columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat])
 
+  def sessionResourceLoader(session: SparkSession): FunctionResourceLoader
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/resources/function/hive-test-udfs.jar 
b/paimon-spark/paimon-spark-ut/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and 
b/paimon-spark/paimon-spark-ut/src/test/resources/function/hive-test-udfs.jar 
differ
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
similarity index 92%
rename from 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
rename to 
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
index 124a089031..659e668709 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
@@ -30,11 +30,11 @@ import org.assertj.core.api.Assertions
 
 import java.util.UUID
 
-class PaimonRestCatalogSparkTestBase extends PaimonSparkTestBase {
+class PaimonSparkTestWithRestCatalogBase extends PaimonSparkTestBase {
 
-  private var restCatalogServer: RESTCatalogServer = null
-  private var serverUrl: String = null
-  private var warehouse: String = null
+  private var restCatalogServer: RESTCatalogServer = _
+  private var serverUrl: String = _
+  private var warehouse: String = _
   private val initToken = "init_token"
 
   override protected def beforeAll(): Unit = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
index 1db9410a46..e0080f6f9c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
@@ -18,13 +18,14 @@
 
 package org.apache.paimon.spark.procedure
 
-import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
 import org.apache.paimon.spark.catalog.SupportView
 
 import org.apache.spark.sql.Row
 import org.assertj.core.api.Assertions
 
-class AlterViewDialectProcedureTest extends PaimonRestCatalogSparkTestBase {
+class AlterViewDialectProcedureTest extends PaimonSparkTestWithRestCatalogBase 
{
+
   test(s"test alter view dialect procedure") {
     val viewName = "view_test"
     spark.sql(s"""
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
index b52022b89a..f3ab65bbd0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
@@ -18,13 +18,14 @@
 
 package org.apache.paimon.spark.procedure
 
-import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
 
 import org.apache.spark.sql.Row
 import org.assertj.core.api.Assertions.assertThat
 
 /** Test for Function Procedure. */
-class FunctionProcedureTest extends PaimonRestCatalogSparkTestBase {
+class FunctionProcedureTest extends PaimonSparkTestWithRestCatalogBase {
+
   test(s"test function procedure") {
     val functionName = "function_test"
     checkAnswer(
@@ -32,14 +33,13 @@ class FunctionProcedureTest extends 
PaimonRestCatalogSparkTestBase {
         "'[{\"id\": 0, \"name\":\"length\", \"type\":\"INT\"}, {\"id\": 1, 
\"name\":\"width\", \"type\":\"INT\"}]'," +
         " '[{\"id\": 0, \"name\":\"area\", \"type\":\"BIGINT\"}]', true, 
'comment', 'k1=v1,k2=v2')"),
       Row(true)
-    );
+    )
     assertThat(
       spark
         .sql("SHOW FUNCTIONS")
         .collect()
         .map(r => r.getString(0))
-        .filter(v => v == s"paimon.test.$functionName")
-        .size).isEqualTo(1)
+        .count(v => v == s"paimon.test.$functionName")).isEqualTo(1)
     checkAnswer(
       spark.sql(s"CALL sys.alter_function('$functionName', " +
         "'{\"action\" : \"addDefinition\", \"name\" : \"spark\", 
\"definition\" : {\"type\" : \"lambda\", \"definition\" : \"(Integer length, 
Integer width) -> { return (long) length * width; }\", \"language\": \"JAVA\" } 
}')"),
@@ -48,17 +48,16 @@ class FunctionProcedureTest extends 
PaimonRestCatalogSparkTestBase {
     checkAnswer(
       spark.sql(s"select paimon.test.$functionName(1, 2)"),
       Row(2)
-    );
+    )
     checkAnswer(
       spark.sql(s"CALL sys.drop_function('$functionName')"),
       Row(true)
-    );
+    )
     assertThat(
       spark
         .sql("SHOW FUNCTIONS")
         .collect()
         .map(r => r.getString(0))
-        .filter(v => v == s"paimon.test.$functionName")
-        .size).isEqualTo(0);
+        .count(v => v == s"paimon.test.$functionName")).isEqualTo(0);
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
new file mode 100644
index 0000000000..5974d9809a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.spark.sql.FunctionResources._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+abstract class PaimonV1FunctionTestBase extends 
PaimonSparkTestWithRestCatalogBase {
+
+  test("Paimon V1 Function: create or replace function") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      checkAnswer(sql("SELECT udf_add2(1, 2)"), Seq(Row(3)))
+
+      // create again should throw exception
+      intercept[Exception] {
+        sql(s"""
+               |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+               |USING JAR '$testUDFJarPath'
+               |""".stripMargin)
+      }
+
+      sql(s"""
+             |CREATE FUNCTION IF NOT EXISTS udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+
+      // create or replace
+      sql(s"""
+             |CREATE OR REPLACE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      checkAnswer(sql("SELECT udf_add2(3, 4)"), Seq(Row(7)))
+    }
+  }
+
+  test("Paimon V1 Function: use function with catalog name and database name") 
{
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      checkAnswer(sql("SELECT udf_add2(3, 4)"), Seq(Row(7)))
+      sql("DROP FUNCTION udf_add2")
+
+      sql(s"""
+             |CREATE FUNCTION test.udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      checkAnswer(sql("SELECT test.udf_add2(3, 4)"), Seq(Row(7)))
+      sql("DROP FUNCTION test.udf_add2")
+
+      sql(s"""
+             |CREATE FUNCTION paimon.test.udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      checkAnswer(sql("SELECT paimon.test.udf_add2(3, 4)"), Seq(Row(7)))
+      sql("DROP FUNCTION paimon.test.udf_add2")
+    }
+  }
+
+  test("Paimon V1 Function: select with attribute") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT)")
+        sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+        checkAnswer(sql("SELECT udf_add2(a, b) FROM t"), Seq(Row(3), Row(7)))
+      }
+    }
+  }
+
+  test("Paimon V1 Function: select with build-in function") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, pt INT) USING paimon PARTITIONED BY (pt)")
+        sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+        checkAnswer(
+          sql(
+            "SELECT a, udf_add2(pow(a, pt), max_pt('t')), pow(a, udf_add2(a, 
pt)) FROM t ORDER BY a"),
+          Seq(Row(1, 5.0d, 1.0d), Row(3, 85.0d, 2187.0d))
+        )
+      }
+    }
+  }
+
+  test("Paimon V1 Function: drop function") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      checkAnswer(sql("SELECT udf_add2(1, 2)"), Seq(Row(3)))
+
+      sql("DROP FUNCTION udf_add2")
+      intercept[Exception] {
+        sql("SELECT udf_add2(3, 4)")
+      }
+      sql("DROP FUNCTION IF EXISTS udf_add2")
+    }
+  }
+
+  test("Paimon V1 Function: describe function") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+
+      checkAnswer(
+        sql("DESCRIBE FUNCTION udf_add2"),
+        Seq(Row("Function: test.udf_add2"), Row(s"Class: 
$UDFExampleAdd2Class"))
+      )
+    }
+  }
+
+  test("Paimon V1 Function: unsupported operation") {
+    // create a build-in function
+    intercept[Exception] {
+      sql(s"""
+             |CREATE FUNCTION max_pt AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+    }
+
+    // drop a build-in function
+    intercept[Exception] {
+      sql("DROP FUNCTION max_pt")
+    }
+  }
+}
+
+class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.v1Function.enabled", "false")
+  }
+
+  test("Paimon V1 Function: disable paimon v1 function") {
+    intercept[Exception] {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+    }
+  }
+}
+
+object FunctionResources {
+
+  val testUDFJarPath: String =
+    getClass.getClassLoader.getResource("function/hive-test-udfs.jar").getPath
+
+  val UDFExampleAdd2Class: String = 
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"
+}
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
index b0782c59d6..e0ff2caf06 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
@@ -19,10 +19,12 @@
 package org.apache.spark.sql.paimon.shims
 
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.FunctionResourceLoader
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionResourceLoader
 class Classic3Api extends ClassicApi {
 
   override def column(expression: Expression): Column = new Column(expression)
@@ -52,4 +54,7 @@ class Classic3Api extends ClassicApi {
       columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) =
     CommandUtils.computeColumnStats(spark, relation, columns)
 
+  override def sessionResourceLoader(session: SparkSession): 
FunctionResourceLoader = {
+    new SessionResourceLoader(session)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
index 8dff78f0bc..860ecc28d3 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
@@ -19,11 +19,13 @@
 package org.apache.spark.sql.paimon.shims
 
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.FunctionResourceLoader
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
 import org.apache.spark.sql.classic.{ClassicConversions, Dataset => 
ClassicDataset, ExpressionUtils}
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionResourceLoader
 
 /**
  * This class is used to implement the conversion from sql-api to classic one. 
Make sure this is the
@@ -60,4 +62,8 @@ class Classic4Api extends ClassicApi with ClassicConversions {
       columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
     CommandUtils.computeColumnStats(spark, relation, columns.toSeq)
   }
+
+  override def sessionResourceLoader(session: SparkSession): 
FunctionResourceLoader = {
+    new SessionResourceLoader(session)
+  }
 }
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index 49823bf4b5..a546d8d889 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -115,6 +115,21 @@ under the License.
                 </exclusions>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.spark</groupId>
+                <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+                <exclusions>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.spark</groupId>
                 <artifactId>spark-hive_${scala.binary.version}</artifactId>


Reply via email to