This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ee2528251b [spark] Add temporary v1 function and UDAF support to 
SparkCatalog (#6101)
ee2528251b is described below

commit ee2528251b4c975608820e0830d48c0a59aec223
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Aug 20 11:38:23 2025 +0800

    [spark] Add temporary v1 function and UDAF support to SparkCatalog (#6101)
---
 docs/content/spark/sql-functions.md                |  12 +-
 .../java/org/apache/paimon/spark/SparkCatalog.java |  18 ++-
 .../paimon/spark/catalog/SupportV1Function.java    |  10 +-
 .../catalyst/analysis/PaimonFunctionResolver.scala |  26 ++--
 .../spark/execution/PaimonFunctionExec.scala       |   6 +-
 .../extensions/PaimonSparkSessionExtensions.scala  |   2 +-
 .../catalog/PaimonV1FunctionRegistry.scala}        | 133 +++++++++++++++++++--
 .../extensions/RewritePaimonFunctionCommands.scala |  86 ++++++++-----
 .../paimon/spark/function/FunctionResources.scala  |  60 ++++++++++
 .../paimon/spark/sql/PaimonFunctionTest.scala      |  32 +----
 .../spark/sql/PaimonV1FunctionTestBase.scala       |  27 +++--
 11 files changed, 298 insertions(+), 114 deletions(-)

diff --git a/docs/content/spark/sql-functions.md 
b/docs/content/spark/sql-functions.md
index 3c280c30ff..65019ac5fd 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -91,24 +91,26 @@ CALL sys.drop_function(`function` => 'my_db.area_func');
 
 Users can define functions within a file, providing flexibility and modular 
support for function definition, only supports jar files now.
 
+Currently, supports Spark or Hive implementations of UDFs and UDAFs, see 
[Spark 
UDFs](https://spark.apache.org/docs/latest/sql-ref-functions.html#udfs-user-defined-functions)
+
 This feature requires Spark 3.4 or higher.
 
 **Example**
 
 ```sql
--- Create Function
-CREATE FUNCTION mydb.simple_udf
+-- Create Function or Temporary Function (Temporary function should not 
specify database name)
+CREATE [TEMPORARY] FUNCTION <mydb>.simple_udf
 AS 'com.example.SimpleUdf' 
 USING JAR '/tmp/SimpleUdf.jar' [, JAR '/tmp/SimpleUdfR.jar'];
 
 -- Create or Replace Function
-CREATE OR REPLACE FUNCTION mydb.simple_udf 
+CREATE OR REPLACE FUNCTION <mydb>.simple_udf 
 AS 'com.example.SimpleUdf'
 USING JAR '/tmp/SimpleUdf.jar';
        
 -- Describe Function
-DESCRIBE FUNCTION [EXTENDED] mydb.simple_udf;
+DESCRIBE FUNCTION [EXTENDED] <mydb>.simple_udf;
 
 -- Drop Function
-DROP FUNCTION mydb.simple_udf;
+DROP [TEMPORARY] FUNCTION <mydb>.simple_udf;
 ```
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c172f4d60c..eee3991ad3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -37,7 +37,6 @@ import org.apache.paimon.spark.catalog.SupportV1Function;
 import org.apache.paimon.spark.catalog.SupportView;
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
 import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
-import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry;
 import org.apache.paimon.spark.utils.CatalogUtils;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.types.DataField;
@@ -54,7 +53,9 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
+import org.apache.spark.sql.catalyst.catalog.PaimonV1FunctionRegistry;
 import org.apache.spark.sql.catalyst.expressions.Expression;
+import 
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function;
 import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
@@ -91,9 +92,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import scala.Option;
-import scala.collection.Seq;
-
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
@@ -125,7 +123,7 @@ public class SparkCatalog extends SparkBaseCatalog
     private Catalog catalog;
     private String defaultDatabase;
     private boolean v1FunctionEnabled;
-    @Nullable private V1FunctionRegistry v1FunctionRegistry;
+    @Nullable private PaimonV1FunctionRegistry v1FunctionRegistry;
 
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
@@ -144,7 +142,7 @@ public class SparkCatalog extends SparkBaseCatalog
                                 
SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue())
                         && DelegateCatalog.rootCatalog(catalog) instanceof 
RESTCatalog;
         if (v1FunctionEnabled) {
-            this.v1FunctionRegistry = new V1FunctionRegistry(sparkSession);
+            this.v1FunctionRegistry = new 
PaimonV1FunctionRegistry(sparkSession);
         }
         try {
             catalog.getDatabase(defaultDatabase);
@@ -674,7 +672,7 @@ public class SparkCatalog extends SparkBaseCatalog
         return namespace.length == 0 || (namespace.length == 1 && 
namespaceExists(namespace));
     }
 
-    private V1FunctionRegistry v1FunctionRegistry() {
+    private PaimonV1FunctionRegistry v1FunctionRegistry() {
         assert v1FunctionRegistry != null;
         return v1FunctionRegistry;
     }
@@ -685,7 +683,7 @@ public class SparkCatalog extends SparkBaseCatalog
     }
 
     @Override
-    public Function getV1Function(FunctionIdentifier funcIdent) throws 
Exception {
+    public Function getFunction(FunctionIdentifier funcIdent) throws Exception 
{
         return 
paimonCatalog().getFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent));
     }
 
@@ -707,8 +705,8 @@ public class SparkCatalog extends SparkBaseCatalog
 
     @Override
     public Expression registerAndResolveV1Function(
-            FunctionIdentifier funcIdent, Option<Function> func, 
Seq<Expression> arguments) {
-        return v1FunctionRegistry().registerAndResolveFunction(funcIdent, 
func, arguments.toSeq());
+            UnResolvedPaimonV1Function unresolvedV1Function) {
+        return 
v1FunctionRegistry().registerAndResolveFunction(unresolvedV1Function);
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
index 2506713ef4..4c070c5949 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
@@ -23,9 +23,7 @@ import org.apache.paimon.function.Function;
 import org.apache.spark.sql.catalyst.FunctionIdentifier;
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
 import org.apache.spark.sql.catalyst.expressions.Expression;
-
-import scala.Option;
-import scala.collection.Seq;
+import 
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function;
 
 /** Catalog supports v1 function. */
 public interface SupportV1Function extends WithPaimonCatalog {
@@ -33,7 +31,7 @@ public interface SupportV1Function extends WithPaimonCatalog {
     boolean v1FunctionEnabled();
 
     /** Look up the function in the catalog. */
-    Function getV1Function(FunctionIdentifier funcIdent) throws Exception;
+    Function getFunction(FunctionIdentifier funcIdent) throws Exception;
 
     void createV1Function(CatalogFunction v1Function, boolean ignoreIfExists) 
throws Exception;
 
@@ -43,8 +41,8 @@ public interface SupportV1Function extends WithPaimonCatalog {
      * Register the function and resolves it to an Expression if not 
registered, otherwise returns
      * the registered Expression.
      */
-    Expression registerAndResolveV1Function(
-            FunctionIdentifier funcIdent, Option<Function> func, 
Seq<Expression> arguments);
+    Expression registerAndResolveV1Function(UnResolvedPaimonV1Function 
unresolvedV1Function)
+            throws Exception;
 
     /** Unregister the func first, then drop it. */
     void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throws 
Exception;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
index aa149e892a..507cbf79de 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
@@ -18,28 +18,34 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.function.{Function => PaimonFunction}
 import org.apache.paimon.spark.catalog.SupportV1Function
 
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_FUNCTION
 
-case class PaimonFunctionResolver() extends Rule[LogicalPlan] {
+case class PaimonFunctionResolver(spark: SparkSession) extends 
Rule[LogicalPlan] {
+
+  protected lazy val catalogManager = spark.sessionState.catalogManager
 
   override def apply(plan: LogicalPlan): LogicalPlan =
     
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
       case l: LogicalPlan =>
         
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
-          case UnResolvedPaimonV1Function(
-                v1FunctionCatalog: SupportV1Function,
-                funcIdent: FunctionIdentifier,
-                func: Option[PaimonFunction],
-                arguments: Seq[Expression]) =>
-            v1FunctionCatalog.registerAndResolveV1Function(funcIdent, func, 
arguments)
+          case u: UnResolvedPaimonV1Function =>
+            u.funcIdent.catalog match {
+              case Some(catalog) =>
+                catalogManager.catalog(catalog) match {
+                  case v1FunctionCatalog: SupportV1Function =>
+                    v1FunctionCatalog.registerAndResolveV1Function(u)
+                  case _ =>
+                    throw new IllegalArgumentException(
+                      s"Catalog $catalog is not a v1 function catalog")
+                }
+              case None => throw new IllegalArgumentException("Catalog name is 
not defined")
+            }
         }
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
index 0d0c7f1161..fb0a2ec014 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
@@ -68,8 +68,7 @@ case class DropPaimonV1FunctionCommand(
 }
 
 case class DescribePaimonV1FunctionCommand(
-    catalog: SupportV1Function,
-    funcIdent: FunctionIdentifier,
+    function: org.apache.paimon.function.Function,
     isExtended: Boolean)
   extends PaimonLeafRunnableCommand {
 
@@ -78,7 +77,6 @@ case class DescribePaimonV1FunctionCommand(
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val function = catalog.getV1Function(funcIdent)
     val rows = new ArrayBuffer[Row]()
     function.definition(FUNCTION_DEFINITION_NAME) match {
       case functionDefinition: FunctionDefinition.FileFunctionDefinition =>
@@ -96,6 +94,6 @@ case class DescribePaimonV1FunctionCommand(
   }
 
   override def simpleString(maxFields: Int): String = {
-    s"DescribePaimonV1FunctionCommand: $funcIdent"
+    s"DescribePaimonV1FunctionCommand: ${function.fullName()}"
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index f1a93c26de..8e363c5026 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -39,7 +39,7 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
     extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
     extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
-    extensions.injectResolutionRule(_ => PaimonFunctionResolver())
+    extensions.injectResolutionRule(spark => PaimonFunctionResolver(spark))
     extensions.injectResolutionRule(spark => 
SparkShimLoader.shim.createCustomResolution(spark))
     extensions.injectResolutionRule(spark => 
PaimonIncompatibleResolutionRules(spark))
     extensions.injectResolutionRule(spark => RewriteUpsertTable(spark))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/PaimonV1FunctionRegistry.scala
similarity index 56%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/PaimonV1FunctionRegistry.scala
index c8a6def184..b5a70c329e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/PaimonV1FunctionRegistry.scala
@@ -16,22 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.catalog.functions
+package org.apache.spark.sql.catalyst.catalog
 
 import org.apache.paimon.function.{Function => PaimonFunction}
+import org.apache.paimon.spark.catalog.functions.V1FunctionConverter
 
 import org.apache.spark.sql.{PaimonUtils, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper}
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, 
FunctionRegistry, FunctionRegistryBase, SimpleFunctionRegistry}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
FunctionExpressionBuilder, FunctionResource, FunctionResourceLoader}
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction, 
Expression, ExpressionInfo, FrameLessOffsetWindowFunction, Lag, Lead, NthValue, 
WindowExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import 
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.BooleanType
 
 import java.util.Locale
 
-case class V1FunctionRegistry(session: SparkSession) extends SQLConfHelper {
+case class PaimonV1FunctionRegistry(session: SparkSession) extends 
SQLConfHelper {
 
   // ================== Start Public API ===================
 
@@ -39,16 +43,14 @@ case class V1FunctionRegistry(session: SparkSession) 
extends SQLConfHelper {
    * Register the function and resolves it to an Expression if not registered, 
otherwise returns the
    * registered Expression.
    */
-  def registerAndResolveFunction(
-      funcIdent: FunctionIdentifier,
-      func: Option[PaimonFunction],
-      arguments: Seq[Expression]): Expression = {
-    resolvePersistentFunctionInternal(
-      funcIdent,
-      func,
-      arguments,
+  def registerAndResolveFunction(u: UnResolvedPaimonV1Function): Expression = {
+    val resolvedFun = resolvePersistentFunctionInternal(
+      u.funcIdent,
+      u.func,
+      u.arguments,
       functionRegistry,
       makeFunctionBuilder)
+    validateFunction(resolvedFun, u.arguments.length, u)
   }
 
   /** Check if the function is registered. */
@@ -174,4 +176,111 @@ case class V1FunctionRegistry(session: SparkSession) 
extends SQLConfHelper {
   protected def format(name: String): String = {
     if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
   }
+
+  private def validateFunction(
+      func: Expression,
+      numArgs: Int,
+      u: UnResolvedPaimonV1Function): Expression = {
+    func match {
+      // AggregateWindowFunctions are AggregateFunctions that can only be 
evaluated within
+      // the context of a Window clause. They do not need to be wrapped in an
+      // AggregateExpression.
+      case wf: AggregateWindowFunction =>
+        if (u.isDistinct) {
+          throw 
QueryCompilationErrors.functionWithUnsupportedSyntaxError(wf.prettyName, 
"DISTINCT")
+        } else if (u.filter.isDefined) {
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            wf.prettyName,
+            "FILTER clause")
+        } else if (u.ignoreNulls) {
+          wf match {
+            case nthValue: NthValue =>
+              nthValue.copy(ignoreNulls = u.ignoreNulls)
+            case _ =>
+              throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                wf.prettyName,
+                "IGNORE NULLS")
+          }
+        } else {
+          wf
+        }
+      case owf: FrameLessOffsetWindowFunction =>
+        if (u.isDistinct) {
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            owf.prettyName,
+            "DISTINCT")
+        } else if (u.filter.isDefined) {
+          throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+            owf.prettyName,
+            "FILTER clause")
+        } else if (u.ignoreNulls) {
+          owf match {
+            case lead: Lead =>
+              lead.copy(ignoreNulls = u.ignoreNulls)
+            case lag: Lag =>
+              lag.copy(ignoreNulls = u.ignoreNulls)
+          }
+        } else {
+          owf
+        }
+      // We get an aggregate function, we need to wrap it in an 
AggregateExpression.
+      case agg: AggregateFunction =>
+        // Note: PythonUDAF does not support these advanced clauses.
+        // For compatibility with spark3.4
+        if 
(agg.getClass.getName.equals("org.apache.spark.sql.catalyst.expressions.PythonUDAF"))
+          checkUnsupportedAggregateClause(agg, u)
+
+        u.filter match {
+          case Some(filter) if !filter.deterministic =>
+            throw new RuntimeException(
+              "FILTER expression is non-deterministic, it cannot be used in 
aggregate functions.")
+          case Some(filter) if filter.dataType != BooleanType =>
+            throw new RuntimeException(
+              "FILTER expression is not of type boolean. It cannot be used in 
an aggregate function.")
+          case Some(filter) if 
filter.exists(_.isInstanceOf[AggregateExpression]) =>
+            throw new RuntimeException(
+              "FILTER expression contains aggregate. It cannot be used in an 
aggregate function.")
+          case Some(filter) if filter.exists(_.isInstanceOf[WindowExpression]) 
=>
+            throw new RuntimeException(
+              "FILTER expression contains window function. It cannot be used 
in an aggregate function.")
+          case _ =>
+        }
+        if (u.ignoreNulls) {
+          val aggFunc = agg match {
+            case first: First => first.copy(ignoreNulls = u.ignoreNulls)
+            case last: Last => last.copy(ignoreNulls = u.ignoreNulls)
+            case any_value: AnyValue => any_value.copy(ignoreNulls = 
u.ignoreNulls)
+            case _ =>
+              throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+                agg.prettyName,
+                "IGNORE NULLS")
+          }
+          aggFunc.toAggregateExpression(u.isDistinct, u.filter)
+        } else {
+          agg.toAggregateExpression(u.isDistinct, u.filter)
+        }
+      // This function is not an aggregate function, just return the resolved 
one.
+      case other =>
+        checkUnsupportedAggregateClause(other, u)
+        other
+    }
+  }
+
+  private def checkUnsupportedAggregateClause(
+      func: Expression,
+      u: UnResolvedPaimonV1Function): Unit = {
+    if (u.isDistinct) {
+      throw 
QueryCompilationErrors.functionWithUnsupportedSyntaxError(func.prettyName, 
"DISTINCT")
+    }
+    if (u.filter.isDefined) {
+      throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+        func.prettyName,
+        "FILTER clause")
+    }
+    if (u.ignoreNulls) {
+      throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+        func.prettyName,
+        "IGNORE NULLS")
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index 56c383f6d2..f295e3eef0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -28,7 +28,7 @@ import org.apache.paimon.spark.util.OptionUtils
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, 
UnresolvedException, UnresolvedFunction, UnresolvedFunctionName, 
UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedException, 
UnresolvedFunction, UnresolvedFunctionName, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction
 import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction, 
DescribeFunction, DropFunction, LogicalPlan}
@@ -68,14 +68,20 @@ case class RewritePaimonFunctionCommands(spark: 
SparkSession)
         if (isPaimonBuildInFunction(funcIdent)) {
           throw new UnsupportedOperationException(s"Can't drop build-in 
function: $funcIdent")
         }
+        // The function may be v1 function or not, anyway it can be safely 
deleted here.
         DropPaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, ifExists)
 
-      case DescribeFunction(
+      case d @ DescribeFunction(
             CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, 
funcIdent),
             isExtended)
           // For Paimon built-in functions, Spark will resolve them by itself.
           if !isPaimonBuildInFunction(funcIdent) =>
-        DescribePaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, 
isExtended)
+        val function = v1FunctionCatalog.getFunction(funcIdent)
+        if (isPaimonV1Function(function)) {
+          DescribePaimonV1FunctionCommand(function, isExtended)
+        } else {
+          d
+        }
 
       // Needs to be done here and transform to `UnResolvedPaimonV1Function`, 
so that spark's Analyzer can resolve
       // the 'arguments' without throwing an exception, saying that function 
is not supported.
@@ -88,21 +94,13 @@ case class RewritePaimonFunctionCommands(spark: 
SparkSession)
                   if !isPaimonBuildInFunction(funcIdent) =>
                 // If the function is already registered, avoid redundant 
lookup in the catalog to reduce overhead.
                 if (v1FunctionCatalog.v1FunctionRegistered(funcIdent)) {
-                  UnResolvedPaimonV1Function(v1FunctionCatalog, funcIdent, 
None, u.arguments)
+                  UnResolvedPaimonV1Function(funcIdent, u, None)
                 } else {
-                  val function = v1FunctionCatalog.getV1Function(funcIdent)
-                  function.definition(FUNCTION_DEFINITION_NAME) match {
-                    case _: FunctionDefinition.FileFunctionDefinition =>
-                      if (u.isDistinct && u.filter.isDefined) {
-                        throw new UnsupportedOperationException(
-                          s"DISTINCT with FILTER is not supported, func name: 
$funcIdent")
-                      }
-                      UnResolvedPaimonV1Function(
-                        v1FunctionCatalog,
-                        funcIdent,
-                        Some(function),
-                        u.arguments)
-                    case _ => u
+                  val function = v1FunctionCatalog.getFunction(funcIdent)
+                  if (isPaimonV1Function(function)) {
+                    UnResolvedPaimonV1Function(funcIdent, u, Some(function))
+                  } else {
+                    u
                   }
                 }
               case _ => u
@@ -125,12 +123,17 @@ case class RewritePaimonFunctionCommands(spark: 
SparkSession)
 
     def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, 
FunctionIdentifier)] = {
       nameParts match {
-        // Spark's built-in functions is without database name or catalog name.
-        case Seq(funName) if 
isSparkBuiltInFunction(FunctionIdentifier(funName)) =>
+        // Spark's built-in or tmp functions is without database name or 
catalog name.
+        case Seq(funName) if 
isSparkBuiltInOrTmpFunction(FunctionIdentifier(funName)) =>
           None
         case CatalogAndIdentifier(v1FunctionCatalog: SupportV1Function, ident)
             if v1FunctionCatalog.v1FunctionEnabled() =>
-          Some(v1FunctionCatalog, FunctionIdentifier(ident.name(), 
Some(ident.namespace().last)))
+          Some(
+            v1FunctionCatalog,
+            FunctionIdentifier(
+              ident.name(),
+              Some(ident.namespace().last),
+              Some(v1FunctionCatalog.name)))
         case _ =>
           None
       }
@@ -141,21 +144,31 @@ case class RewritePaimonFunctionCommands(spark: 
SparkSession)
     PaimonFunctions.names.contains(funcIdent.funcName)
   }
 
-  private def isSparkBuiltInFunction(funcIdent: FunctionIdentifier): Boolean = 
{
-    FunctionRegistry.builtin.functionExists(funcIdent)
+  private def isSparkBuiltInOrTmpFunction(funcIdent: FunctionIdentifier): 
Boolean = {
+    catalogManager.v1SessionCatalog.isBuiltinFunction(funcIdent) || 
catalogManager.v1SessionCatalog
+      .isTemporaryFunction(funcIdent)
+  }
+
+  private def isPaimonV1Function(fun: PaimonFunction): Boolean = {
+    fun.definition(FUNCTION_DEFINITION_NAME) match {
+      case _: FunctionDefinition.FileFunctionDefinition => true
+      case _ => false
+    }
   }
 }
 
 /** An unresolved Paimon V1 function to let Spark resolve the necessary 
variables. */
 case class UnResolvedPaimonV1Function(
-    v1FunctionCatalog: SupportV1Function,
     funcIdent: FunctionIdentifier,
-    func: Option[PaimonFunction],
-    arguments: Seq[Expression])
+    arguments: Seq[Expression],
+    isDistinct: Boolean,
+    filter: Option[Expression] = None,
+    ignoreNulls: Boolean = false,
+    func: Option[PaimonFunction] = None)
   extends Expression
   with Unevaluable {
 
-  override def children: Seq[Expression] = arguments
+  override def children: Seq[Expression] = arguments ++ filter.toSeq
 
   override def dataType: DataType = throw new UnresolvedException("dataType")
 
@@ -167,8 +180,27 @@ case class UnResolvedPaimonV1Function(
 
   override def prettyName: String = funcIdent.identifier
 
+  override def toString: String = {
+    val distinct = if (isDistinct) "distinct " else ""
+    s"'$prettyName($distinct${children.mkString(", ")})"
+  }
+
   override protected def withNewChildrenInternal(
       newChildren: IndexedSeq[Expression]): UnResolvedPaimonV1Function = {
-    copy(arguments = newChildren)
+    if (filter.isDefined) {
+      copy(arguments = newChildren.dropRight(1), filter = 
Some(newChildren.last))
+    } else {
+      copy(arguments = newChildren)
+    }
+  }
+}
+
+object UnResolvedPaimonV1Function {
+
+  def apply(
+      funcIdent: FunctionIdentifier,
+      u: UnresolvedFunction,
+      fun: Option[PaimonFunction]): UnResolvedPaimonV1Function = {
+    UnResolvedPaimonV1Function(funcIdent, u.arguments, u.isDistinct, u.filter, 
u.ignoreNulls, fun)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/function/FunctionResources.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/function/FunctionResources.scala
new file mode 100644
index 0000000000..1232c3ffa6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/function/FunctionResources.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
+
+object FunctionResources {
+
+  val testUDFJarPath: String =
+    getClass.getClassLoader.getResource("function/hive-test-udfs.jar").getPath
+
+  val UDFExampleAdd2Class: String = 
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"
+
+  val MyIntSumClass: String = "org.apache.paimon.spark.function.MyIntSum"
+}
+
+class MyIntSum extends UserDefinedAggregateFunction {
+
+  override def inputSchema: StructType = new StructType().add("input", 
IntegerType)
+
+  override def bufferSchema: StructType = new StructType().add("buffer", 
IntegerType)
+
+  override def dataType: DataType = IntegerType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, 0)
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    buffer.update(0, buffer.getInt(0) + input.getInt(0))
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0))
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    buffer.getInt(0)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
index 7ace245262..b1263cd8d2 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
@@ -20,11 +20,10 @@ package org.apache.paimon.spark.sql
 
 import org.apache.paimon.spark.PaimonHiveTestBase
 import org.apache.paimon.spark.catalog.functions.PaimonFunctions
+import org.apache.paimon.spark.function.FunctionResources.MyIntSumClass
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier}
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
-import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
 
 class PaimonFunctionTest extends PaimonHiveTestBase {
 
@@ -100,7 +99,7 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
   test("Paimon function: show and load function with SparkGenericCatalog") {
     sql(s"USE $sparkCatalogName")
     sql(s"USE $hiveDbName")
-    sql("CREATE FUNCTION myIntSum AS 'org.apache.paimon.spark.sql.MyIntSum'")
+    sql(s"CREATE FUNCTION myIntSum AS '$MyIntSumClass'")
     checkAnswer(
       sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"),
       Row("spark_catalog.test_hive.myintsum"))
@@ -170,30 +169,3 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
     }
   }
 }
-
-private class MyIntSum extends UserDefinedAggregateFunction {
-
-  override def inputSchema: StructType = new StructType().add("input", 
IntegerType)
-
-  override def bufferSchema: StructType = new StructType().add("buffer", 
IntegerType)
-
-  override def dataType: DataType = IntegerType
-
-  override def deterministic: Boolean = true
-
-  override def initialize(buffer: MutableAggregationBuffer): Unit = {
-    buffer.update(0, 0)
-  }
-
-  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-    buffer.update(0, buffer.getInt(0) + input.getInt(0))
-  }
-
-  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-    buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0))
-  }
-
-  override def evaluate(buffer: Row): Any = {
-    buffer.getInt(0)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 5974d9809a..567ae910b6 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
-import org.apache.paimon.spark.sql.FunctionResources._
+import org.apache.paimon.spark.function.FunctionResources._
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
@@ -157,6 +157,23 @@ abstract class PaimonV1FunctionTestBase extends 
PaimonSparkTestWithRestCatalogBa
       sql("DROP FUNCTION max_pt")
     }
   }
+
+  test("Paimon V1 Function: user defined aggregate function") {
+    for (isTemp <- Seq(true, false)) {
+      withUserDefinedFunction("myIntSum" -> isTemp) {
+        if (isTemp) {
+          sql(s"CREATE TEMPORARY FUNCTION myIntSum AS '$MyIntSumClass'")
+        } else {
+          sql(s"CREATE FUNCTION myIntSum AS '$MyIntSumClass'")
+        }
+        withTable("t") {
+          sql("CREATE TABLE t (id INT) USING paimon")
+          sql("INSERT INTO t VALUES (1), (2), (3)")
+          checkAnswer(sql("SELECT myIntSum(id) FROM t"), Row(6))
+        }
+      }
+    }
+  }
 }
 
 class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {
@@ -174,11 +191,3 @@ class DisablePaimonV1FunctionTest extends 
PaimonSparkTestWithRestCatalogBase {
     }
   }
 }
-
-object FunctionResources {
-
-  val testUDFJarPath: String =
-    getClass.getClassLoader.getResource("function/hive-test-udfs.jar").getPath
-
-  val UDFExampleAdd2Class: String = 
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"
-}


Reply via email to