This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ee2528251b [spark] Add temporary v1 function and UDAF support to
SparkCatalog (#6101)
ee2528251b is described below
commit ee2528251b4c975608820e0830d48c0a59aec223
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Aug 20 11:38:23 2025 +0800
[spark] Add temporary v1 function and UDAF support to SparkCatalog (#6101)
---
docs/content/spark/sql-functions.md | 12 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 18 ++-
.../paimon/spark/catalog/SupportV1Function.java | 10 +-
.../catalyst/analysis/PaimonFunctionResolver.scala | 26 ++--
.../spark/execution/PaimonFunctionExec.scala | 6 +-
.../extensions/PaimonSparkSessionExtensions.scala | 2 +-
.../catalog/PaimonV1FunctionRegistry.scala} | 133 +++++++++++++++++++--
.../extensions/RewritePaimonFunctionCommands.scala | 86 ++++++++-----
.../paimon/spark/function/FunctionResources.scala | 60 ++++++++++
.../paimon/spark/sql/PaimonFunctionTest.scala | 32 +----
.../spark/sql/PaimonV1FunctionTestBase.scala | 27 +++--
11 files changed, 298 insertions(+), 114 deletions(-)
diff --git a/docs/content/spark/sql-functions.md
b/docs/content/spark/sql-functions.md
index 3c280c30ff..65019ac5fd 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -91,24 +91,26 @@ CALL sys.drop_function(`function` => 'my_db.area_func');
Users can define functions within a file, providing flexibility and modular
support for function definition, only supports jar files now.
+Currently, supports Spark or Hive implementations of UDFs and UDAFs, see
[Spark
UDFs](https://spark.apache.org/docs/latest/sql-ref-functions.html#udfs-user-defined-functions)
+
This feature requires Spark 3.4 or higher.
**Example**
```sql
--- Create Function
-CREATE FUNCTION mydb.simple_udf
+-- Create Function or Temporary Function (Temporary function should not
specify database name)
+CREATE [TEMPORARY] FUNCTION <mydb>.simple_udf
AS 'com.example.SimpleUdf'
USING JAR '/tmp/SimpleUdf.jar' [, JAR '/tmp/SimpleUdfR.jar'];
-- Create or Replace Function
-CREATE OR REPLACE FUNCTION mydb.simple_udf
+CREATE OR REPLACE FUNCTION <mydb>.simple_udf
AS 'com.example.SimpleUdf'
USING JAR '/tmp/SimpleUdf.jar';
-- Describe Function
-DESCRIBE FUNCTION [EXTENDED] mydb.simple_udf;
+DESCRIBE FUNCTION [EXTENDED] <mydb>.simple_udf;
-- Drop Function
-DROP FUNCTION mydb.simple_udf;
+DROP [TEMPORARY] FUNCTION <mydb>.simple_udf;
```
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index c172f4d60c..eee3991ad3 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -37,7 +37,6 @@ import org.apache.paimon.spark.catalog.SupportV1Function;
import org.apache.paimon.spark.catalog.SupportView;
import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
-import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry;
import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.DataField;
@@ -54,7 +53,9 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
+import org.apache.spark.sql.catalyst.catalog.PaimonV1FunctionRegistry;
import org.apache.spark.sql.catalyst.expressions.Expression;
+import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
@@ -91,9 +92,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import scala.Option;
-import scala.collection.Seq;
-
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
@@ -125,7 +123,7 @@ public class SparkCatalog extends SparkBaseCatalog
private Catalog catalog;
private String defaultDatabase;
private boolean v1FunctionEnabled;
- @Nullable private V1FunctionRegistry v1FunctionRegistry;
+ @Nullable private PaimonV1FunctionRegistry v1FunctionRegistry;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
@@ -144,7 +142,7 @@ public class SparkCatalog extends SparkBaseCatalog
SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue())
&& DelegateCatalog.rootCatalog(catalog) instanceof
RESTCatalog;
if (v1FunctionEnabled) {
- this.v1FunctionRegistry = new V1FunctionRegistry(sparkSession);
+ this.v1FunctionRegistry = new
PaimonV1FunctionRegistry(sparkSession);
}
try {
catalog.getDatabase(defaultDatabase);
@@ -674,7 +672,7 @@ public class SparkCatalog extends SparkBaseCatalog
return namespace.length == 0 || (namespace.length == 1 &&
namespaceExists(namespace));
}
- private V1FunctionRegistry v1FunctionRegistry() {
+ private PaimonV1FunctionRegistry v1FunctionRegistry() {
assert v1FunctionRegistry != null;
return v1FunctionRegistry;
}
@@ -685,7 +683,7 @@ public class SparkCatalog extends SparkBaseCatalog
}
@Override
- public Function getV1Function(FunctionIdentifier funcIdent) throws
Exception {
+ public Function getFunction(FunctionIdentifier funcIdent) throws Exception
{
return
paimonCatalog().getFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent));
}
@@ -707,8 +705,8 @@ public class SparkCatalog extends SparkBaseCatalog
@Override
public Expression registerAndResolveV1Function(
- FunctionIdentifier funcIdent, Option<Function> func,
Seq<Expression> arguments) {
- return v1FunctionRegistry().registerAndResolveFunction(funcIdent,
func, arguments.toSeq());
+ UnResolvedPaimonV1Function unresolvedV1Function) {
+ return
v1FunctionRegistry().registerAndResolveFunction(unresolvedV1Function);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
index 2506713ef4..4c070c5949 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
@@ -23,9 +23,7 @@ import org.apache.paimon.function.Function;
import org.apache.spark.sql.catalyst.FunctionIdentifier;
import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
import org.apache.spark.sql.catalyst.expressions.Expression;
-
-import scala.Option;
-import scala.collection.Seq;
+import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function;
/** Catalog supports v1 function. */
public interface SupportV1Function extends WithPaimonCatalog {
@@ -33,7 +31,7 @@ public interface SupportV1Function extends WithPaimonCatalog {
boolean v1FunctionEnabled();
/** Look up the function in the catalog. */
- Function getV1Function(FunctionIdentifier funcIdent) throws Exception;
+ Function getFunction(FunctionIdentifier funcIdent) throws Exception;
void createV1Function(CatalogFunction v1Function, boolean ignoreIfExists)
throws Exception;
@@ -43,8 +41,8 @@ public interface SupportV1Function extends WithPaimonCatalog {
* Register the function and resolves it to an Expression if not
registered, otherwise returns
* the registered Expression.
*/
- Expression registerAndResolveV1Function(
- FunctionIdentifier funcIdent, Option<Function> func,
Seq<Expression> arguments);
+ Expression registerAndResolveV1Function(UnResolvedPaimonV1Function
unresolvedV1Function)
+ throws Exception;
/** Unregister the func first, then drop it. */
void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throws
Exception;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
index aa149e892a..507cbf79de 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
@@ -18,28 +18,34 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.function.{Function => PaimonFunction}
import org.apache.paimon.spark.catalog.SupportV1Function
-import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.SparkSession
import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_FUNCTION
-case class PaimonFunctionResolver() extends Rule[LogicalPlan] {
+case class PaimonFunctionResolver(spark: SparkSession) extends
Rule[LogicalPlan] {
+
+ protected lazy val catalogManager = spark.sessionState.catalogManager
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
case l: LogicalPlan =>
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
- case UnResolvedPaimonV1Function(
- v1FunctionCatalog: SupportV1Function,
- funcIdent: FunctionIdentifier,
- func: Option[PaimonFunction],
- arguments: Seq[Expression]) =>
- v1FunctionCatalog.registerAndResolveV1Function(funcIdent, func,
arguments)
+ case u: UnResolvedPaimonV1Function =>
+ u.funcIdent.catalog match {
+ case Some(catalog) =>
+ catalogManager.catalog(catalog) match {
+ case v1FunctionCatalog: SupportV1Function =>
+ v1FunctionCatalog.registerAndResolveV1Function(u)
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Catalog $catalog is not a v1 function catalog")
+ }
+ case None => throw new IllegalArgumentException("Catalog name is
not defined")
+ }
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
index 0d0c7f1161..fb0a2ec014 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
@@ -68,8 +68,7 @@ case class DropPaimonV1FunctionCommand(
}
case class DescribePaimonV1FunctionCommand(
- catalog: SupportV1Function,
- funcIdent: FunctionIdentifier,
+ function: org.apache.paimon.function.Function,
isExtended: Boolean)
extends PaimonLeafRunnableCommand {
@@ -78,7 +77,6 @@ case class DescribePaimonV1FunctionCommand(
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- val function = catalog.getV1Function(funcIdent)
val rows = new ArrayBuffer[Row]()
function.definition(FUNCTION_DEFINITION_NAME) match {
case functionDefinition: FunctionDefinition.FileFunctionDefinition =>
@@ -96,6 +94,6 @@ case class DescribePaimonV1FunctionCommand(
}
override def simpleString(maxFields: Int): String = {
- s"DescribePaimonV1FunctionCommand: $funcIdent"
+ s"DescribePaimonV1FunctionCommand: ${function.fullName()}"
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index f1a93c26de..8e363c5026 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -39,7 +39,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
- extensions.injectResolutionRule(_ => PaimonFunctionResolver())
+ extensions.injectResolutionRule(spark => PaimonFunctionResolver(spark))
extensions.injectResolutionRule(spark =>
SparkShimLoader.shim.createCustomResolution(spark))
extensions.injectResolutionRule(spark =>
PaimonIncompatibleResolutionRules(spark))
extensions.injectResolutionRule(spark => RewriteUpsertTable(spark))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/PaimonV1FunctionRegistry.scala
similarity index 56%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/PaimonV1FunctionRegistry.scala
index c8a6def184..b5a70c329e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/PaimonV1FunctionRegistry.scala
@@ -16,22 +16,26 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.catalog.functions
+package org.apache.spark.sql.catalyst.catalog
import org.apache.paimon.function.{Function => PaimonFunction}
+import org.apache.paimon.spark.catalog.functions.V1FunctionConverter
import org.apache.spark.sql.{PaimonUtils, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException,
FunctionRegistry, FunctionRegistryBase, SimpleFunctionRegistry}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
-import org.apache.spark.sql.catalyst.catalog.{CatalogFunction,
FunctionExpressionBuilder, FunctionResource, FunctionResourceLoader}
-import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+import org.apache.spark.sql.catalyst.expressions.{AggregateWindowFunction,
Expression, ExpressionInfo, FrameLessOffsetWindowFunction, Lag, Lead, NthValue,
WindowExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.BooleanType
import java.util.Locale
-case class V1FunctionRegistry(session: SparkSession) extends SQLConfHelper {
+case class PaimonV1FunctionRegistry(session: SparkSession) extends
SQLConfHelper {
// ================== Start Public API ===================
@@ -39,16 +43,14 @@ case class V1FunctionRegistry(session: SparkSession)
extends SQLConfHelper {
* Register the function and resolves it to an Expression if not registered,
otherwise returns the
* registered Expression.
*/
- def registerAndResolveFunction(
- funcIdent: FunctionIdentifier,
- func: Option[PaimonFunction],
- arguments: Seq[Expression]): Expression = {
- resolvePersistentFunctionInternal(
- funcIdent,
- func,
- arguments,
+ def registerAndResolveFunction(u: UnResolvedPaimonV1Function): Expression = {
+ val resolvedFun = resolvePersistentFunctionInternal(
+ u.funcIdent,
+ u.func,
+ u.arguments,
functionRegistry,
makeFunctionBuilder)
+ validateFunction(resolvedFun, u.arguments.length, u)
}
/** Check if the function is registered. */
@@ -174,4 +176,111 @@ case class V1FunctionRegistry(session: SparkSession)
extends SQLConfHelper {
protected def format(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
}
+
+ private def validateFunction(
+ func: Expression,
+ numArgs: Int,
+ u: UnResolvedPaimonV1Function): Expression = {
+ func match {
+ // AggregateWindowFunctions are AggregateFunctions that can only be
evaluated within
+ // the context of a Window clause. They do not need to be wrapped in an
+ // AggregateExpression.
+ case wf: AggregateWindowFunction =>
+ if (u.isDistinct) {
+ throw
QueryCompilationErrors.functionWithUnsupportedSyntaxError(wf.prettyName,
"DISTINCT")
+ } else if (u.filter.isDefined) {
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ wf.prettyName,
+ "FILTER clause")
+ } else if (u.ignoreNulls) {
+ wf match {
+ case nthValue: NthValue =>
+ nthValue.copy(ignoreNulls = u.ignoreNulls)
+ case _ =>
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ wf.prettyName,
+ "IGNORE NULLS")
+ }
+ } else {
+ wf
+ }
+ case owf: FrameLessOffsetWindowFunction =>
+ if (u.isDistinct) {
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ owf.prettyName,
+ "DISTINCT")
+ } else if (u.filter.isDefined) {
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ owf.prettyName,
+ "FILTER clause")
+ } else if (u.ignoreNulls) {
+ owf match {
+ case lead: Lead =>
+ lead.copy(ignoreNulls = u.ignoreNulls)
+ case lag: Lag =>
+ lag.copy(ignoreNulls = u.ignoreNulls)
+ }
+ } else {
+ owf
+ }
+ // We get an aggregate function, we need to wrap it in an
AggregateExpression.
+ case agg: AggregateFunction =>
+ // Note: PythonUDAF does not support these advanced clauses.
+ // For compatibility with spark3.4
+ if
(agg.getClass.getName.equals("org.apache.spark.sql.catalyst.expressions.PythonUDAF"))
+ checkUnsupportedAggregateClause(agg, u)
+
+ u.filter match {
+ case Some(filter) if !filter.deterministic =>
+ throw new RuntimeException(
+ "FILTER expression is non-deterministic, it cannot be used in
aggregate functions.")
+ case Some(filter) if filter.dataType != BooleanType =>
+ throw new RuntimeException(
+ "FILTER expression is not of type boolean. It cannot be used in
an aggregate function.")
+ case Some(filter) if
filter.exists(_.isInstanceOf[AggregateExpression]) =>
+ throw new RuntimeException(
+ "FILTER expression contains aggregate. It cannot be used in an
aggregate function.")
+ case Some(filter) if filter.exists(_.isInstanceOf[WindowExpression])
=>
+ throw new RuntimeException(
+ "FILTER expression contains window function. It cannot be used
in an aggregate function.")
+ case _ =>
+ }
+ if (u.ignoreNulls) {
+ val aggFunc = agg match {
+ case first: First => first.copy(ignoreNulls = u.ignoreNulls)
+ case last: Last => last.copy(ignoreNulls = u.ignoreNulls)
+ case any_value: AnyValue => any_value.copy(ignoreNulls =
u.ignoreNulls)
+ case _ =>
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ agg.prettyName,
+ "IGNORE NULLS")
+ }
+ aggFunc.toAggregateExpression(u.isDistinct, u.filter)
+ } else {
+ agg.toAggregateExpression(u.isDistinct, u.filter)
+ }
+ // This function is not an aggregate function, just return the resolved
one.
+ case other =>
+ checkUnsupportedAggregateClause(other, u)
+ other
+ }
+ }
+
+ private def checkUnsupportedAggregateClause(
+ func: Expression,
+ u: UnResolvedPaimonV1Function): Unit = {
+ if (u.isDistinct) {
+ throw
QueryCompilationErrors.functionWithUnsupportedSyntaxError(func.prettyName,
"DISTINCT")
+ }
+ if (u.filter.isDefined) {
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ func.prettyName,
+ "FILTER clause")
+ }
+ if (u.ignoreNulls) {
+ throw QueryCompilationErrors.functionWithUnsupportedSyntaxError(
+ func.prettyName,
+ "IGNORE NULLS")
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index 56c383f6d2..f295e3eef0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -28,7 +28,7 @@ import org.apache.paimon.spark.util.OptionUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry,
UnresolvedException, UnresolvedFunction, UnresolvedFunctionName,
UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedException,
UnresolvedFunction, UnresolvedFunctionName, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction,
DescribeFunction, DropFunction, LogicalPlan}
@@ -68,14 +68,20 @@ case class RewritePaimonFunctionCommands(spark:
SparkSession)
if (isPaimonBuildInFunction(funcIdent)) {
throw new UnsupportedOperationException(s"Can't drop build-in
function: $funcIdent")
}
+ // The function may be v1 function or not, anyway it can be safely
deleted here.
DropPaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, ifExists)
- case DescribeFunction(
+ case d @ DescribeFunction(
CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function,
funcIdent),
isExtended)
// For Paimon built-in functions, Spark will resolve them by itself.
if !isPaimonBuildInFunction(funcIdent) =>
- DescribePaimonV1FunctionCommand(v1FunctionCatalog, funcIdent,
isExtended)
+ val function = v1FunctionCatalog.getFunction(funcIdent)
+ if (isPaimonV1Function(function)) {
+ DescribePaimonV1FunctionCommand(function, isExtended)
+ } else {
+ d
+ }
// Needs to be done here and transform to `UnResolvedPaimonV1Function`,
so that spark's Analyzer can resolve
// the 'arguments' without throwing an exception, saying that function
is not supported.
@@ -88,21 +94,13 @@ case class RewritePaimonFunctionCommands(spark:
SparkSession)
if !isPaimonBuildInFunction(funcIdent) =>
// If the function is already registered, avoid redundant
lookup in the catalog to reduce overhead.
if (v1FunctionCatalog.v1FunctionRegistered(funcIdent)) {
- UnResolvedPaimonV1Function(v1FunctionCatalog, funcIdent,
None, u.arguments)
+ UnResolvedPaimonV1Function(funcIdent, u, None)
} else {
- val function = v1FunctionCatalog.getV1Function(funcIdent)
- function.definition(FUNCTION_DEFINITION_NAME) match {
- case _: FunctionDefinition.FileFunctionDefinition =>
- if (u.isDistinct && u.filter.isDefined) {
- throw new UnsupportedOperationException(
- s"DISTINCT with FILTER is not supported, func name:
$funcIdent")
- }
- UnResolvedPaimonV1Function(
- v1FunctionCatalog,
- funcIdent,
- Some(function),
- u.arguments)
- case _ => u
+ val function = v1FunctionCatalog.getFunction(funcIdent)
+ if (isPaimonV1Function(function)) {
+ UnResolvedPaimonV1Function(funcIdent, u, Some(function))
+ } else {
+ u
}
}
case _ => u
@@ -125,12 +123,17 @@ case class RewritePaimonFunctionCommands(spark:
SparkSession)
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin,
FunctionIdentifier)] = {
nameParts match {
- // Spark's built-in functions is without database name or catalog name.
- case Seq(funName) if
isSparkBuiltInFunction(FunctionIdentifier(funName)) =>
+ // Spark's built-in or tmp functions is without database name or
catalog name.
+ case Seq(funName) if
isSparkBuiltInOrTmpFunction(FunctionIdentifier(funName)) =>
None
case CatalogAndIdentifier(v1FunctionCatalog: SupportV1Function, ident)
if v1FunctionCatalog.v1FunctionEnabled() =>
- Some(v1FunctionCatalog, FunctionIdentifier(ident.name(),
Some(ident.namespace().last)))
+ Some(
+ v1FunctionCatalog,
+ FunctionIdentifier(
+ ident.name(),
+ Some(ident.namespace().last),
+ Some(v1FunctionCatalog.name)))
case _ =>
None
}
@@ -141,21 +144,31 @@ case class RewritePaimonFunctionCommands(spark:
SparkSession)
PaimonFunctions.names.contains(funcIdent.funcName)
}
- private def isSparkBuiltInFunction(funcIdent: FunctionIdentifier): Boolean =
{
- FunctionRegistry.builtin.functionExists(funcIdent)
+ private def isSparkBuiltInOrTmpFunction(funcIdent: FunctionIdentifier):
Boolean = {
+ catalogManager.v1SessionCatalog.isBuiltinFunction(funcIdent) ||
catalogManager.v1SessionCatalog
+ .isTemporaryFunction(funcIdent)
+ }
+
+ private def isPaimonV1Function(fun: PaimonFunction): Boolean = {
+ fun.definition(FUNCTION_DEFINITION_NAME) match {
+ case _: FunctionDefinition.FileFunctionDefinition => true
+ case _ => false
+ }
}
}
/** An unresolved Paimon V1 function to let Spark resolve the necessary
variables. */
case class UnResolvedPaimonV1Function(
- v1FunctionCatalog: SupportV1Function,
funcIdent: FunctionIdentifier,
- func: Option[PaimonFunction],
- arguments: Seq[Expression])
+ arguments: Seq[Expression],
+ isDistinct: Boolean,
+ filter: Option[Expression] = None,
+ ignoreNulls: Boolean = false,
+ func: Option[PaimonFunction] = None)
extends Expression
with Unevaluable {
- override def children: Seq[Expression] = arguments
+ override def children: Seq[Expression] = arguments ++ filter.toSeq
override def dataType: DataType = throw new UnresolvedException("dataType")
@@ -167,8 +180,27 @@ case class UnResolvedPaimonV1Function(
override def prettyName: String = funcIdent.identifier
+ override def toString: String = {
+ val distinct = if (isDistinct) "distinct " else ""
+ s"'$prettyName($distinct${children.mkString(", ")})"
+ }
+
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): UnResolvedPaimonV1Function = {
- copy(arguments = newChildren)
+ if (filter.isDefined) {
+ copy(arguments = newChildren.dropRight(1), filter =
Some(newChildren.last))
+ } else {
+ copy(arguments = newChildren)
+ }
+ }
+}
+
+object UnResolvedPaimonV1Function {
+
+ def apply(
+ funcIdent: FunctionIdentifier,
+ u: UnresolvedFunction,
+ fun: Option[PaimonFunction]): UnResolvedPaimonV1Function = {
+ UnResolvedPaimonV1Function(funcIdent, u.arguments, u.isDistinct, u.filter,
u.ignoreNulls, fun)
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/function/FunctionResources.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/function/FunctionResources.scala
new file mode 100644
index 0000000000..1232c3ffa6
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/function/FunctionResources.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.function
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
+
+object FunctionResources {
+
+ val testUDFJarPath: String =
+ getClass.getClassLoader.getResource("function/hive-test-udfs.jar").getPath
+
+ val UDFExampleAdd2Class: String =
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"
+
+ val MyIntSumClass: String = "org.apache.paimon.spark.function.MyIntSum"
+}
+
+class MyIntSum extends UserDefinedAggregateFunction {
+
+ override def inputSchema: StructType = new StructType().add("input",
IntegerType)
+
+ override def bufferSchema: StructType = new StructType().add("buffer",
IntegerType)
+
+ override def dataType: DataType = IntegerType
+
+ override def deterministic: Boolean = true
+
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
+ buffer.update(0, 0)
+ }
+
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ buffer.update(0, buffer.getInt(0) + input.getInt(0))
+ }
+
+ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+ buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0))
+ }
+
+ override def evaluate(buffer: Row): Any = {
+ buffer.getInt(0)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
index 7ace245262..b1263cd8d2 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
@@ -20,11 +20,10 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.spark.catalog.functions.PaimonFunctions
+import org.apache.paimon.spark.function.FunctionResources.MyIntSumClass
import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier}
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
-import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
class PaimonFunctionTest extends PaimonHiveTestBase {
@@ -100,7 +99,7 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
test("Paimon function: show and load function with SparkGenericCatalog") {
sql(s"USE $sparkCatalogName")
sql(s"USE $hiveDbName")
- sql("CREATE FUNCTION myIntSum AS 'org.apache.paimon.spark.sql.MyIntSum'")
+ sql(s"CREATE FUNCTION myIntSum AS '$MyIntSumClass'")
checkAnswer(
sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"),
Row("spark_catalog.test_hive.myintsum"))
@@ -170,30 +169,3 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
}
}
}
-
-private class MyIntSum extends UserDefinedAggregateFunction {
-
- override def inputSchema: StructType = new StructType().add("input",
IntegerType)
-
- override def bufferSchema: StructType = new StructType().add("buffer",
IntegerType)
-
- override def dataType: DataType = IntegerType
-
- override def deterministic: Boolean = true
-
- override def initialize(buffer: MutableAggregationBuffer): Unit = {
- buffer.update(0, 0)
- }
-
- override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
- buffer.update(0, buffer.getInt(0) + input.getInt(0))
- }
-
- override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
- buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0))
- }
-
- override def evaluate(buffer: Row): Any = {
- buffer.getInt(0)
- }
-}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 5974d9809a..567ae910b6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
-import org.apache.paimon.spark.sql.FunctionResources._
+import org.apache.paimon.spark.function.FunctionResources._
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
@@ -157,6 +157,23 @@ abstract class PaimonV1FunctionTestBase extends
PaimonSparkTestWithRestCatalogBa
sql("DROP FUNCTION max_pt")
}
}
+
+ test("Paimon V1 Function: user defined aggregate function") {
+ for (isTemp <- Seq(true, false)) {
+ withUserDefinedFunction("myIntSum" -> isTemp) {
+ if (isTemp) {
+ sql(s"CREATE TEMPORARY FUNCTION myIntSum AS '$MyIntSumClass'")
+ } else {
+ sql(s"CREATE FUNCTION myIntSum AS '$MyIntSumClass'")
+ }
+ withTable("t") {
+ sql("CREATE TABLE t (id INT) USING paimon")
+ sql("INSERT INTO t VALUES (1), (2), (3)")
+ checkAnswer(sql("SELECT myIntSum(id) FROM t"), Row(6))
+ }
+ }
+ }
+ }
}
class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {
@@ -174,11 +191,3 @@ class DisablePaimonV1FunctionTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
}
-
-object FunctionResources {
-
- val testUDFJarPath: String =
- getClass.getClassLoader.getResource("function/hive-test-udfs.jar").getPath
-
- val UDFExampleAdd2Class: String =
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"
-}