This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 397b67353b [spark] Add v1 function support to SparkCatalog (#6075)
397b67353b is described below
commit 397b67353bc520aee25b524da38fbdf3b031beac
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 15 09:13:56 2025 +0800
[spark] Add v1 function support to SparkCatalog (#6075)
---
.gitignore | 1 +
docs/content/concepts/functions.md | 35 +---
docs/content/spark/sql-functions.md | 71 +++++++-
.../generated/spark_catalog_configuration.html | 6 +
.../java/org/apache/paimon/function/Function.java | 3 +
.../org/apache/paimon/function/FunctionImpl.java | 49 +++++-
paimon-spark/paimon-spark-3.2/pom.xml | 18 ++
.../extensions/RewritePaimonFunctionCommands.scala | 32 +---
paimon-spark/paimon-spark-3.3/pom.xml | 19 +++
.../extensions/RewritePaimonFunctionCommands.scala | 32 +---
paimon-spark/paimon-spark-3.4/pom.xml | 19 +++
.../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
.../paimon/spark/sql/PaimonV1FunctionTest.scala | 30 +---
paimon-spark/paimon-spark-3.5/pom.xml | 19 +++
.../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
.../paimon/spark/sql/PaimonV1FunctionTest.scala | 30 +---
paimon-spark/paimon-spark-4.0/pom.xml | 19 +++
.../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
.../paimon/spark/sql/PaimonV1FunctionTest.scala | 30 +---
paimon-spark/paimon-spark-common/pom.xml | 12 ++
.../java/org/apache/paimon/spark/SparkCatalog.java | 85 +++++++++-
.../apache/paimon/spark/SparkCatalogOptions.java | 6 +
.../paimon/spark/catalog/SupportV1Function.java | 51 ++++++
.../spark/catalog/functions/PaimonFunctions.scala | 4 +-
.../catalog/functions/V1FunctionConverter.scala | 81 +++++++++
.../catalog/functions/V1FunctionRegistry.scala | 177 ++++++++++++++++++++
.../catalyst/analysis/PaimonFunctionResolver.scala | 45 +++++
.../spark/execution/PaimonFunctionExec.scala | 101 +++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 3 +-
.../org/apache/paimon/spark/util/OptionUtils.scala | 6 +-
.../scala/org/apache/spark/sql/PaimonUtils.scala | 8 +
.../AbstractPaimonSparkSqlExtensionsParser.scala | 16 +-
.../extensions/RewritePaimonFunctionCommands.scala | 174 +++++++++++++++++++
.../apache/spark/sql/paimon/shims/ClassicApi.scala | 2 +
.../src/test/resources/function/hive-test-udfs.jar | Bin 0 -> 35660 bytes
...la => PaimonSparkTestWithRestCatalogBase.scala} | 8 +-
.../procedure/AlterViewDialectProcedureTest.scala | 5 +-
.../spark/procedure/FunctionProcedureTest.scala | 17 +-
.../spark/sql/PaimonV1FunctionTestBase.scala | 184 +++++++++++++++++++++
.../spark/sql/paimon/shims/Classic3Api.scala | 5 +
.../spark/sql/paimon/shims/Classic4Api.scala | 6 +
paimon-spark/pom.xml | 15 ++
42 files changed, 1215 insertions(+), 209 deletions(-)
diff --git a/.gitignore b/.gitignore
index c4c2d54fe9..50d63fc69f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,6 +12,7 @@ target
*.iml
*.swp
*.jar
+!**/resources/**/*.jar
*.log
*.pyc
*.ipr
diff --git a/docs/content/concepts/functions.md
b/docs/content/concepts/functions.md
index 11d05f590b..e66cd07762 100644
--- a/docs/content/concepts/functions.md
+++ b/docs/content/concepts/functions.md
@@ -84,37 +84,6 @@ DROP FUNCTION mydb.parse_str;
This statement deletes the existing `parse_str` function from the `mydb`
database, relinquishing its functionality.
-## Lambda Function Usage in Spark
+## Functions in Spark
-### Create Function
-
-```sql
--- Spark SQL
-CALL sys.create_function(`function` => 'my_db.area_func',
- `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1,
"name":"width", "type":"INT"}]',
- `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
- `deterministic` => true,
- `comment` => 'comment',
- `options` => 'k1=v1,k2=v2'
-);
-```
-
-### Alter Function
-
-```sql
--- Spark SQL
-CALL sys.alter_function(`function` => 'my_db.area_func',
- `change` => '{"action" : "addDefinition", "name" : "spark", "definition" :
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return
(long) length * width; }", "language": "JAVA" } }'
-);
-```
-```sql
--- Spark SQL
-select paimon.my_db.area_func(1, 2);
-```
-
-### Drop Function
-
-```sql
--- Spark SQL
-CALL sys.drop_function(`function` => 'my_db.area_func');
-```
+see [SQL Functions]({{< ref "spark/sql-functions#user-defined-function" >}})
\ No newline at end of file
diff --git a/docs/content/spark/sql-functions.md
b/docs/content/spark/sql-functions.md
index eda7eb0654..3c280c30ff 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -29,7 +29,9 @@ under the License.
This section introduce all available Paimon Spark functions.
-## max_pt
+## Built-in Function
+
+### max_pt
`max_pt($table_name)`
@@ -44,12 +46,69 @@ It would throw exception when:
**Example**
-```shell
-> SELECT max_pt('t');
- 20250101
+```sql
+SELECT max_pt('t');
+-- 20250101
-> SELECT * FROM t where pt = max_pt('t');
- a, 20250101
+SELECT * FROM t where pt = max_pt('t');
+-- a, 20250101
```
**Since: 1.1.0**
+
+## User-defined Function
+
+Paimon Spark supports two types of user-defined functions: lambda functions
and file-based functions.
+
+This feature currently only supports the REST catalog.
+
+### Lambda Function
+
+Empowering users to define functions using Java lambda expressions, enabling
inline, concise, and functional-style operations.
+
+**Example**
+
+```sql
+-- Create Function
+CALL sys.create_function(`function` => 'my_db.area_func',
+ `inputParams` => '[{"id": 0, "name":"length", "type":"INT"}, {"id": 1,
"name":"width", "type":"INT"}]',
+ `returnParams` => '[{"id": 0, "name":"area", "type":"BIGINT"}]',
+ `deterministic` => true,
+ `comment` => 'comment',
+ `options` => 'k1=v1,k2=v2'
+);
+
+-- Alter Function
+CALL sys.alter_function(`function` => 'my_db.area_func',
+ `change` => '{"action" : "addDefinition", "name" : "spark", "definition" :
{"type" : "lambda", "definition" : "(Integer length, Integer width) -> { return
(long) length * width; }", "language": "JAVA" } }'
+);
+
+-- Drop Function
+CALL sys.drop_function(`function` => 'my_db.area_func');
+```
+
+### File Function
+
+Users can define functions within a file, providing flexibility and modular
support for function definition, only supports jar files now.
+
+This feature requires Spark 3.4 or higher.
+
+**Example**
+
+```sql
+-- Create Function
+CREATE FUNCTION mydb.simple_udf
+AS 'com.example.SimpleUdf'
+USING JAR '/tmp/SimpleUdf.jar' [, JAR '/tmp/SimpleUdfR.jar'];
+
+-- Create or Replace Function
+CREATE OR REPLACE FUNCTION mydb.simple_udf
+AS 'com.example.SimpleUdf'
+USING JAR '/tmp/SimpleUdf.jar';
+
+-- Describe Function
+DESCRIBE FUNCTION [EXTENDED] mydb.simple_udf;
+
+-- Drop Function
+DROP FUNCTION mydb.simple_udf;
+```
diff --git a/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
b/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
index a3431959a1..c6d22093de 100644
--- a/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_catalog_configuration.html
@@ -38,5 +38,11 @@ under the License.
<td>String</td>
<td>The default database name.</td>
</tr>
+ <tr>
+ <td><h5>v1Function.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to enable v1 function.</td>
+ </tr>
</tbody>
</table>
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
b/paimon-api/src/main/java/org/apache/paimon/function/Function.java
index b5d01e63c1..34a5b7e2ac 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++ b/paimon-api/src/main/java/org/apache/paimon/function/Function.java
@@ -18,6 +18,7 @@
package org.apache.paimon.function;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.types.DataField;
import java.util.List;
@@ -31,6 +32,8 @@ public interface Function {
String fullName();
+ Identifier identifier();
+
Optional<List<DataField>> inputParams();
Optional<List<DataField>> returnParams();
diff --git
a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
index f617aa3a97..44b2e2246c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
+++ b/paimon-api/src/main/java/org/apache/paimon/function/FunctionImpl.java
@@ -21,8 +21,12 @@ package org.apache.paimon.function;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.types.DataField;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
/** Function implementation. */
@@ -30,25 +34,25 @@ public class FunctionImpl implements Function {
private final Identifier identifier;
- private final List<DataField> inputParams;
+ @Nullable private final List<DataField> inputParams;
- private final List<DataField> returnParams;
+ @Nullable private final List<DataField> returnParams;
private final boolean deterministic;
private final Map<String, FunctionDefinition> definitions;
- private final String comment;
+ @Nullable private final String comment;
private final Map<String, String> options;
public FunctionImpl(
Identifier identifier,
- List<DataField> inputParams,
- List<DataField> returnParams,
+ @Nullable List<DataField> inputParams,
+ @Nullable List<DataField> returnParams,
boolean deterministic,
Map<String, FunctionDefinition> definitions,
- String comment,
+ @Nullable String comment,
Map<String, String> options) {
this.identifier = identifier;
this.inputParams = inputParams;
@@ -66,7 +70,7 @@ public class FunctionImpl implements Function {
this.deterministic = true;
this.definitions = definitions;
this.comment = null;
- this.options = null;
+ this.options = Collections.emptyMap();
}
@Override
@@ -79,6 +83,10 @@ public class FunctionImpl implements Function {
return identifier.getFullName();
}
+ public Identifier identifier() {
+ return identifier;
+ }
+
@Override
public Optional<List<DataField>> inputParams() {
return Optional.ofNullable(inputParams);
@@ -113,4 +121,31 @@ public class FunctionImpl implements Function {
public Map<String, String> options() {
return options;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FunctionImpl function = (FunctionImpl) o;
+ return deterministic == function.deterministic
+ && Objects.equals(identifier, function.identifier)
+ && Objects.equals(inputParams, function.inputParams)
+ && Objects.equals(returnParams, function.returnParams)
+ && Objects.equals(definitions, function.definitions)
+ && Objects.equals(comment, function.comment)
+ && Objects.equals(options, function.options);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ identifier,
+ inputParams,
+ returnParams,
+ deterministic,
+ definitions,
+ comment,
+ options);
+ }
}
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml
b/paimon-spark/paimon-spark-3.2/pom.xml
index 11b6e61ff1..cc62cde3b3 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -61,6 +61,18 @@ under the License.
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
@@ -114,6 +126,12 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
similarity index 60%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index b5d01e63c1..4698e48a09 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -16,32 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.function;
+package org.apache.spark.sql.catalyst.parser.extensions
-import org.apache.paimon.types.DataField;
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+case class RewritePaimonFunctionCommands(spark: SparkSession) extends
Rule[LogicalPlan] {
-/** Interface for function. */
-public interface Function {
-
- String name();
-
- String fullName();
-
- Optional<List<DataField>> inputParams();
-
- Optional<List<DataField>> returnParams();
-
- boolean isDeterministic();
-
- Map<String, FunctionDefinition> definitions();
-
- FunctionDefinition definition(String name);
-
- String comment();
-
- Map<String, String> options();
+ // do nothing
+ override def apply(plan: LogicalPlan): LogicalPlan = plan
}
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml
b/paimon-spark/paimon-spark-3.3/pom.xml
index 4ced2186cb..22e4cf269c 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -61,6 +61,18 @@ under the License.
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
@@ -113,6 +125,13 @@ under the License.
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
similarity index 60%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index b5d01e63c1..4698e48a09 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -16,32 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.function;
+package org.apache.spark.sql.catalyst.parser.extensions
-import org.apache.paimon.types.DataField;
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+case class RewritePaimonFunctionCommands(spark: SparkSession) extends
Rule[LogicalPlan] {
-/** Interface for function. */
-public interface Function {
-
- String name();
-
- String fullName();
-
- Optional<List<DataField>> inputParams();
-
- Optional<List<DataField>> returnParams();
-
- boolean isDeterministic();
-
- Map<String, FunctionDefinition> definitions();
-
- FunctionDefinition definition(String name);
-
- String comment();
-
- Map<String, String> options();
+ // do nothing
+ override def apply(plan: LogicalPlan): LogicalPlan = plan
}
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml
b/paimon-spark/paimon-spark-3.4/pom.xml
index 2a9b72915a..14d39806e7 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -60,6 +60,18 @@ under the License.
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
@@ -112,6 +124,13 @@ under the License.
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/resources/function/hive-test-udfs.jar
b/paimon-spark/paimon-spark-3.4/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and
b/paimon-spark/paimon-spark-3.4/src/test/resources/function/hive-test-udfs.jar
differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
similarity index 59%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to
paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
index b5d01e63c1..f37fbad270 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -16,32 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.function;
+package org.apache.paimon.spark.sql
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Interface for function. */
-public interface Function {
-
- String name();
-
- String fullName();
-
- Optional<List<DataField>> inputParams();
-
- Optional<List<DataField>> returnParams();
-
- boolean isDeterministic();
-
- Map<String, FunctionDefinition> definitions();
-
- FunctionDefinition definition(String name);
-
- String comment();
-
- Map<String, String> options();
-}
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-3.5/pom.xml
b/paimon-spark/paimon-spark-3.5/pom.xml
index 4be60e17a8..ee87e3e8da 100644
--- a/paimon-spark/paimon-spark-3.5/pom.xml
+++ b/paimon-spark/paimon-spark-3.5/pom.xml
@@ -60,6 +60,18 @@ under the License.
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
@@ -139,6 +151,13 @@ under the License.
</exclusions>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/resources/function/hive-test-udfs.jar
b/paimon-spark/paimon-spark-3.5/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and
b/paimon-spark/paimon-spark-3.5/src/test/resources/function/hive-test-udfs.jar
differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
similarity index 59%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
index b5d01e63c1..f37fbad270 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -16,32 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.function;
+package org.apache.paimon.spark.sql
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Interface for function. */
-public interface Function {
-
- String name();
-
- String fullName();
-
- Optional<List<DataField>> inputParams();
-
- Optional<List<DataField>> returnParams();
-
- boolean isDeterministic();
-
- Map<String, FunctionDefinition> definitions();
-
- FunctionDefinition definition(String name);
-
- String comment();
-
- Map<String, String> options();
-}
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-4.0/pom.xml
b/paimon-spark/paimon-spark-4.0/pom.xml
index 2087136d87..50158bbeb6 100644
--- a/paimon-spark/paimon-spark-4.0/pom.xml
+++ b/paimon-spark/paimon-spark-4.0/pom.xml
@@ -60,6 +60,18 @@ under the License.
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-bundle</artifactId>
@@ -111,6 +123,13 @@ under the License.
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>${okhttp.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/resources/function/hive-test-udfs.jar
b/paimon-spark/paimon-spark-4.0/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and
b/paimon-spark/paimon-spark-4.0/src/test/resources/function/hive-test-udfs.jar
differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
similarity index 59%
copy from paimon-api/src/main/java/org/apache/paimon/function/Function.java
copy to
paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
index b5d01e63c1..f37fbad270 100644
--- a/paimon-api/src/main/java/org/apache/paimon/function/Function.java
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTest.scala
@@ -16,32 +16,6 @@
* limitations under the License.
*/
-package org.apache.paimon.function;
+package org.apache.paimon.spark.sql
-import org.apache.paimon.types.DataField;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-/** Interface for function. */
-public interface Function {
-
- String name();
-
- String fullName();
-
- Optional<List<DataField>> inputParams();
-
- Optional<List<DataField>> returnParams();
-
- boolean isDeterministic();
-
- Map<String, FunctionDefinition> definitions();
-
- FunctionDefinition definition(String name);
-
- String comment();
-
- Map<String, String> options();
-}
+class PaimonV1FunctionTest extends PaimonV1FunctionTestBase {}
diff --git a/paimon-spark/paimon-spark-common/pom.xml
b/paimon-spark/paimon-spark-common/pom.xml
index 20cc34ff63..ad352e32d3 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -50,6 +50,18 @@ under the License.
<version>${spark.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 1f32e04903..53dc85eb44 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -22,16 +22,21 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionDefinition;
import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.catalog.FormatTableCatalog;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
+import org.apache.paimon.spark.catalog.SupportV1Function;
import org.apache.paimon.spark.catalog.SupportView;
import org.apache.paimon.spark.catalog.functions.PaimonFunctions;
+import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
+import org.apache.paimon.spark.catalog.functions.V1FunctionRegistry;
import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTableOptions;
@@ -42,11 +47,14 @@ import org.apache.paimon.utils.TypeUtils;
import org.apache.spark.sql.PaimonSparkSession$;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
+import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
@@ -73,6 +81,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -81,6 +91,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import scala.Option;
+import scala.collection.Seq;
+
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
@@ -98,29 +111,41 @@ import static
org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultVa
/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog
- implements SupportView, FunctionCatalog, SupportsNamespaces,
FormatTableCatalog {
+ implements SupportView,
+ SupportV1Function,
+ FunctionCatalog,
+ SupportsNamespaces,
+ FormatTableCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(SparkCatalog.class);
public static final String FUNCTION_DEFINITION_NAME = "spark";
private static final String PRIMARY_KEY_IDENTIFIER = "primary-key";
- protected Catalog catalog = null;
-
+ private Catalog catalog;
private String defaultDatabase;
+ private boolean v1FunctionEnabled;
+ @Nullable private V1FunctionRegistry v1FunctionRegistry;
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
checkRequiredConfigurations();
-
+ SparkSession sparkSession = PaimonSparkSession$.MODULE$.active();
this.catalogName = name;
CatalogContext catalogContext =
CatalogContext.create(
- Options.fromMap(options),
-
PaimonSparkSession$.MODULE$.active().sessionState().newHadoopConf());
+ Options.fromMap(options),
sparkSession.sessionState().newHadoopConf());
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(),
DEFAULT_DATABASE.defaultValue());
+ this.v1FunctionEnabled =
+ options.getBoolean(
+ SparkCatalogOptions.V1FUNCTION_ENABLED.key(),
+
SparkCatalogOptions.V1FUNCTION_ENABLED.defaultValue())
+ && DelegateCatalog.rootCatalog(catalog) instanceof
RESTCatalog;
+ if (v1FunctionEnabled) {
+ this.v1FunctionRegistry = new V1FunctionRegistry(sparkSession);
+ }
try {
catalog.getDatabase(defaultDatabase);
} catch (Catalog.DatabaseNotExistException e) {
@@ -616,9 +641,7 @@ public class SparkCatalog extends SparkBaseCatalog
Function paimonFunction =
catalog.getFunction(toIdentifier(ident));
FunctionDefinition functionDefinition =
paimonFunction.definition(FUNCTION_DEFINITION_NAME);
- if (functionDefinition != null
- && functionDefinition
- instanceof
FunctionDefinition.LambdaFunctionDefinition) {
+ if (functionDefinition instanceof
FunctionDefinition.LambdaFunctionDefinition) {
FunctionDefinition.LambdaFunctionDefinition
lambdaFunctionDefinition =
(FunctionDefinition.LambdaFunctionDefinition)
functionDefinition;
if (paimonFunction.returnParams().isPresent()) {
@@ -651,6 +674,50 @@ public class SparkCatalog extends SparkBaseCatalog
return namespace.length == 0 || (namespace.length == 1 &&
namespaceExists(namespace));
}
+ private V1FunctionRegistry v1FunctionRegistry() {
+ assert v1FunctionRegistry != null;
+ return v1FunctionRegistry;
+ }
+
+ @Override
+ public boolean v1FunctionEnabled() {
+ return v1FunctionEnabled;
+ }
+
+ @Override
+ public Function getV1Function(FunctionIdentifier funcIdent) throws
Exception {
+ return
paimonCatalog().getFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent));
+ }
+
+ @Override
+ public void createV1Function(CatalogFunction v1Function, boolean
ignoreIfExists)
+ throws Exception {
+ Function paimonFunction =
V1FunctionConverter.fromV1Function(v1Function);
+ paimonCatalog()
+ .createFunction(
+
V1FunctionConverter.fromFunctionIdentifier(v1Function.identifier()),
+ paimonFunction,
+ ignoreIfExists);
+ }
+
+ @Override
+ public boolean v1FunctionRegistered(FunctionIdentifier funcIdent) {
+ return v1FunctionRegistry().isRegistered(funcIdent);
+ }
+
+ @Override
+ public Expression registerAndResolveV1Function(
+ FunctionIdentifier funcIdent, Option<Function> func,
Seq<Expression> arguments) {
+ return v1FunctionRegistry().registerAndResolveFunction(funcIdent,
func, arguments.toSeq());
+ }
+
+ @Override
+ public void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists)
throws Exception {
+ v1FunctionRegistry().unregisterFunction(funcIdent);
+ paimonCatalog()
+
.dropFunction(V1FunctionConverter.fromFunctionIdentifier(funcIdent), ifExists);
+ }
+
private PropertyChange toPropertyChange(NamespaceChange change) {
if (change instanceof NamespaceChange.SetProperty) {
NamespaceChange.SetProperty set = (NamespaceChange.SetProperty)
change;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
index d990d4c864..f069e00d48 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalogOptions.java
@@ -37,4 +37,10 @@ public class SparkCatalogOptions {
.stringType()
.defaultValue(Catalog.DEFAULT_DATABASE)
.withDescription("The default database name.");
+
+ public static final ConfigOption<Boolean> V1FUNCTION_ENABLED =
+ key("v1Function.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether to enable v1 function.");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
new file mode 100644
index 0000000000..2506713ef4
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportV1Function.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog;
+
+import org.apache.paimon.function.Function;
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier;
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction;
+import org.apache.spark.sql.catalyst.expressions.Expression;
+
+import scala.Option;
+import scala.collection.Seq;
+
+/** Catalog supports v1 function. */
+public interface SupportV1Function extends WithPaimonCatalog {
+
+ boolean v1FunctionEnabled();
+
+ /** Look up the function in the catalog. */
+ Function getV1Function(FunctionIdentifier funcIdent) throws Exception;
+
+ void createV1Function(CatalogFunction v1Function, boolean ignoreIfExists)
throws Exception;
+
+ boolean v1FunctionRegistered(FunctionIdentifier funcIdent);
+
+ /**
+ * Register the function and resolves it to an Expression if not
registered, otherwise returns
+ * the registered Expression.
+ */
+ Expression registerAndResolveV1Function(
+ FunctionIdentifier funcIdent, Option<Function> func,
Seq<Expression> arguments);
+
+ /** Unregister the func first, then drop it. */
+ void dropV1Function(FunctionIdentifier funcIdent, boolean ifExists) throws
Exception;
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
index f04edf88c4..2876f7e7f1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/PaimonFunctions.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalog.functions
import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.bucket
import org.apache.paimon.data.serializer.InternalRowSerializer
-import
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableList,
ImmutableMap}
+import
org.apache.paimon.shade.guava30.com.google.common.collect.{ImmutableMap,
ImmutableSet}
import org.apache.paimon.spark.SparkInternalRowWrapper
import org.apache.paimon.spark.SparkTypeUtils.toPaimonRowType
import org.apache.paimon.spark.catalog.functions.PaimonFunctions._
@@ -61,7 +61,7 @@ object PaimonFunctions {
MOD_BUCKET
)
- val names: ImmutableList[String] = FUNCTIONS.keySet.asList()
+ val names: ImmutableSet[String] = FUNCTIONS.keySet
def bucketFunctionName(funcType: BucketFunctionType): String =
TYPE_FUNC_MAPPING.get(funcType)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionConverter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionConverter.scala
new file mode 100644
index 0000000000..be4daa86b9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionConverter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog.functions
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.function.{Function, FunctionDefinition, FunctionImpl}
+import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction,
FunctionResource, FunctionResourceType}
+
+import scala.collection.JavaConverters._
+
+object V1FunctionConverter {
+
+ /** Converts spark [[FunctionIdentifier]] to paimon [[Identifier]]. */
+ def fromFunctionIdentifier(ident: FunctionIdentifier): Identifier = {
+ new Identifier(ident.database.get, ident.funcName)
+ }
+
+ /** Converts paimon [[Identifier]] to spark [[FunctionIdentifier]]. */
+ def toFunctionIdentifier(ident: Identifier): FunctionIdentifier = {
+ new FunctionIdentifier(ident.getObjectName, Some(ident.getDatabaseName))
+ }
+
+ /** Converts spark [[CatalogFunction]] to paimon [[Function]]. */
+ def fromV1Function(v1Function: CatalogFunction): Function = {
+ val functionIdentifier = v1Function.identifier
+ val identifier = fromFunctionIdentifier(functionIdentifier)
+ val fileResources = v1Function.resources
+ .map(r => new
FunctionDefinition.FunctionFileResource(r.resourceType.resourceType, r.uri))
+ .toList
+
+ val functionDefinition: FunctionDefinition = FunctionDefinition.file(
+ fileResources.asJava,
+ "JAVA", // Apache Spark only supports JAR persistent function now.
+ v1Function.className,
+ functionIdentifier.funcName)
+ val definitions = Map(FUNCTION_DEFINITION_NAME ->
functionDefinition).asJava
+
+ new FunctionImpl(identifier, definitions)
+ }
+
+ /** Converts paimon [[Function]] to spark [[CatalogFunction]]. */
+ def toV1Function(paimonFunction: Function): CatalogFunction = {
+ paimonFunction.definition(FUNCTION_DEFINITION_NAME) match {
+ case functionDefinition: FunctionDefinition.FileFunctionDefinition =>
+ val fileResources = functionDefinition
+ .fileResources()
+ .asScala
+ .map(r =>
FunctionResource(FunctionResourceType.fromString(r.resourceType()), r.uri()))
+ .toSeq
+
+ CatalogFunction(
+ new FunctionIdentifier(
+ paimonFunction.name(),
+ Some(paimonFunction.identifier().getDatabaseName)),
+ functionDefinition.className(),
+ fileResources)
+
+ case other =>
+ throw new UnsupportedOperationException(s"Unsupported function
definition $other")
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
new file mode 100644
index 0000000000..c8a6def184
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalog/functions/V1FunctionRegistry.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalog.functions
+
+import org.apache.paimon.function.{Function => PaimonFunction}
+
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper}
+import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException,
FunctionRegistry, FunctionRegistryBase, SimpleFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.{CatalogFunction,
FunctionExpressionBuilder, FunctionResource, FunctionResourceLoader}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
+import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+
+import java.util.Locale
+
+case class V1FunctionRegistry(session: SparkSession) extends SQLConfHelper {
+
+ // ================== Start Public API ===================
+
+ /**
+ * Register the function and resolves it to an Expression if not registered,
otherwise returns the
+ * registered Expression.
+ */
+ def registerAndResolveFunction(
+ funcIdent: FunctionIdentifier,
+ func: Option[PaimonFunction],
+ arguments: Seq[Expression]): Expression = {
+ resolvePersistentFunctionInternal(
+ funcIdent,
+ func,
+ arguments,
+ functionRegistry,
+ makeFunctionBuilder)
+ }
+
+ /** Check if the function is registered. */
+ def isRegistered(funcIdent: FunctionIdentifier): Boolean = {
+ val qualifiedIdent = qualifyIdentifier(funcIdent)
+ functionRegistry.functionExists(qualifiedIdent)
+ }
+
+ /** Unregister the function. */
+ def unregisterFunction(funcIdent: FunctionIdentifier): Unit = {
+ val qualifiedIdent = qualifyIdentifier(funcIdent)
+ if (functionRegistry.functionExists(qualifiedIdent)) {
+ functionRegistry.dropFunction(qualifiedIdent)
+ }
+ }
+
+ // ================== End Public API ===================
+
+ // Most copy from spark
+ private val functionResourceLoader: FunctionResourceLoader =
+ SparkShimLoader.shim.classicApi.sessionResourceLoader(session)
+ private val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry
+ private val functionExpressionBuilder: FunctionExpressionBuilder =
HiveUDFExpressionBuilder
+
+ /** Look up a persistent scalar function by name and resolves it to an
Expression. */
+ private def resolvePersistentFunctionInternal[T](
+ funcIdent: FunctionIdentifier,
+ func: Option[PaimonFunction],
+ arguments: Seq[Expression],
+ registry: FunctionRegistryBase[T],
+ createFunctionBuilder: CatalogFunction =>
FunctionRegistryBase[T]#FunctionBuilder): T = {
+
+ val name = funcIdent
+ // `synchronized` is used to prevent multiple threads from concurrently
resolving the
+ // same function that has not yet been loaded into the function registry.
This is needed
+ // because calling `registerFunction` twice with `overrideIfExists =
false` can lead to
+ // a FunctionAlreadyExistsException.
+ synchronized {
+ val qualifiedIdent = qualifyIdentifier(name)
+ if (registry.functionExists(qualifiedIdent)) {
+ // This function has been already loaded into the function registry.
+ registry.lookupFunction(qualifiedIdent, arguments)
+ } else {
+ // The function has not been loaded to the function registry, which
means
+ // that the function is a persistent function (if it actually has been
registered
+ // in the metastore). We need to first put the function in the
function registry.
+ require(func.isDefined, "Function must be defined")
+ val catalogFunction = V1FunctionConverter.toV1Function(func.get)
+ loadFunctionResources(catalogFunction.resources)
+ // Please note that qualifiedName is provided by the user. However,
+ // catalogFunction.identifier.unquotedString is returned by the
underlying
+ // catalog. So, it is possible that qualifiedName is not exactly the
same as
+ // catalogFunction.identifier.unquotedString (difference is on
case-sensitivity).
+ // At here, we preserve the input from the user.
+ val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
+ registerFunction(
+ funcMetadata,
+ overrideIfExists = false,
+ registry = registry,
+ functionBuilder = createFunctionBuilder(funcMetadata))
+ // Now, we need to create the Expression.
+ registry.lookupFunction(qualifiedIdent, arguments)
+ }
+ }
+ }
+
+ /**
+ * Loads resources such as JARs and Files for a function. Every resource is
represented by a tuple
+ * (resource type, resource uri).
+ */
+ private def loadFunctionResources(resources: Seq[FunctionResource]): Unit = {
+ resources.foreach(functionResourceLoader.loadResource)
+ }
+
+ private def registerFunction[T](
+ funcDefinition: CatalogFunction,
+ overrideIfExists: Boolean,
+ registry: FunctionRegistryBase[T],
+ functionBuilder: FunctionRegistryBase[T]#FunctionBuilder): Unit = {
+ val func = funcDefinition.identifier
+ if (registry.functionExists(func) && !overrideIfExists) {
+ throw new FunctionAlreadyExistsException(func.nameParts)
+ }
+ val info = makeExprInfoForHiveFunction(funcDefinition)
+ registry.registerFunction(func, info, functionBuilder)
+ }
+
+ private def makeExprInfoForHiveFunction(func: CatalogFunction):
ExpressionInfo = {
+ new ExpressionInfo(
+ func.className,
+ func.identifier.database.orNull,
+ func.identifier.funcName,
+ null,
+ "",
+ "",
+ "",
+ "",
+ "",
+ "",
+ "hive")
+ }
+
+ /** Constructs a [[FunctionBuilder]] based on the provided function
metadata. */
+ private def makeFunctionBuilder(func: CatalogFunction): FunctionBuilder = {
+ val className = func.className
+ if (!PaimonUtils.classIsLoadable(className)) {
+ throw new IllegalArgumentException(s"Cannot load class: $className")
+ }
+ val clazz = PaimonUtils.classForName(className)
+ val name = func.identifier.unquotedString
+ (input) => functionExpressionBuilder.makeExpression(name, clazz, input)
+ }
+
+ /**
+ * Qualifies the function identifier with the current database if not
specified, and normalize all
+ * the names.
+ */
+ private def qualifyIdentifier(ident: FunctionIdentifier): FunctionIdentifier
= {
+ FunctionIdentifier(funcName = format(ident.funcName), database =
ident.database)
+ }
+
+ /** Formats object names, taking into account case sensitivity. */
+ protected def format(name: String): String = {
+ if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
new file mode 100644
index 0000000000..aa149e892a
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonFunctionResolver.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.function.{Function => PaimonFunction}
+import org.apache.paimon.spark.catalog.SupportV1Function
+
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions.Expression
+import
org.apache.spark.sql.catalyst.parser.extensions.UnResolvedPaimonV1Function
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_FUNCTION
+
+case class PaimonFunctionResolver() extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan =
+
plan.resolveOperatorsUpWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+ case l: LogicalPlan =>
+
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+ case UnResolvedPaimonV1Function(
+ v1FunctionCatalog: SupportV1Function,
+ funcIdent: FunctionIdentifier,
+ func: Option[PaimonFunction],
+ arguments: Seq[Expression]) =>
+ v1FunctionCatalog.registerAndResolveV1Function(funcIdent, func,
arguments)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
new file mode 100644
index 0000000000..0d0c7f1161
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonFunctionExec.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.function.FunctionDefinition
+import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME
+import org.apache.paimon.spark.catalog.SupportV1Function
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.types.StringType
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class CreatePaimonV1FunctionCommand(
+ catalog: SupportV1Function,
+ v1Function: CatalogFunction,
+ ignoreIfExists: Boolean,
+ replace: Boolean)
+ extends PaimonLeafRunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ // Note: for replace just drop then create ,this operation is non-atomic.
+ if (replace) {
+ catalog.dropV1Function(v1Function.identifier, true)
+ }
+ catalog.createV1Function(v1Function, ignoreIfExists)
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"CreatePaimonV1FunctionCommand: ${v1Function.identifier}"
+ }
+}
+
+case class DropPaimonV1FunctionCommand(
+ catalog: SupportV1Function,
+ funcIdent: FunctionIdentifier,
+ ifExists: Boolean)
+ extends PaimonLeafRunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ catalog.dropV1Function(funcIdent, ifExists)
+ Nil
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"DropPaimonV1FunctionCommand: $funcIdent"
+ }
+}
+
+case class DescribePaimonV1FunctionCommand(
+ catalog: SupportV1Function,
+ funcIdent: FunctionIdentifier,
+ isExtended: Boolean)
+ extends PaimonLeafRunnableCommand {
+
+ override val output: Seq[Attribute] = {
+ Seq(AttributeReference("function_desc", StringType, nullable = false)())
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val function = catalog.getV1Function(funcIdent)
+ val rows = new ArrayBuffer[Row]()
+ function.definition(FUNCTION_DEFINITION_NAME) match {
+ case functionDefinition: FunctionDefinition.FileFunctionDefinition =>
+ rows += Row(s"Function: ${function.fullName()}")
+ rows += Row(s"Class: ${functionDefinition.className()}")
+ if (isExtended) {
+ rows += Row(
+ s"File Resources:
${functionDefinition.fileResources().asScala.map(_.uri()).mkString(", ")}")
+ }
+ case other =>
+ throw new UnsupportedOperationException(s"Unsupported function
definition $other")
+ }
+
+ rows.toSeq
+ }
+
+ override def simpleString(maxFields: Int): String = {
+ s"DescribePaimonV1FunctionCommand: $funcIdent"
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 9a7f62e6db..f1a93c26de 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.extensions
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable,
PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable,
PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueries}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.commands.BucketExpression
@@ -39,6 +39,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
+ extensions.injectResolutionRule(_ => PaimonFunctionResolver())
extensions.injectResolutionRule(spark =>
SparkShimLoader.shim.createCustomResolution(spark))
extensions.injectResolutionRule(spark =>
PaimonIncompatibleResolutionRules(spark))
extensions.injectResolutionRule(spark => RewriteUpsertTable(spark))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index 2acfc4665d..dd35dd394d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.util
import org.apache.paimon.catalog.Identifier
import org.apache.paimon.options.ConfigOption
-import org.apache.paimon.spark.SparkConnectorOptions
+import org.apache.paimon.spark.{SparkCatalogOptions, SparkConnectorOptions}
import org.apache.paimon.table.Table
import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -71,6 +71,10 @@ object OptionUtils extends SQLConfHelper {
getOptionString(SparkConnectorOptions.EXPLICIT_CAST).toBoolean
}
+ def v1FunctionEnabled(): Boolean = {
+ getOptionString(SparkCatalogOptions.V1FUNCTION_ENABLED).toBoolean
+ }
+
private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String,
String] = {
val mergedOptions = new JHashMap[String, String](
conf.getAllConfs
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 885910dbc2..8f46f25518 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -126,4 +126,12 @@ object PaimonUtils {
def sameType(left: DataType, right: DataType): Boolean = {
left.sameType(right)
}
+
+ def classIsLoadable(clazz: String): Boolean = {
+ SparkUtils.classIsLoadable(clazz)
+ }
+
+ def classForName(clazz: String): Class[_] = {
+ SparkUtils.classForName(clazz)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 73e1ea3ec8..2e84df8027 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
import
org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser.{NonReservedContext,
QuotedIdentifierContext}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types.{DataType, StructType}
@@ -65,10 +66,23 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
parse(sqlTextAfterSubstitution)(parser =>
astBuilder.visit(parser.singleStatement()))
.asInstanceOf[LogicalPlan]
} else {
-
RewritePaimonViewCommands(PaimonSparkSession.active).apply(delegate.parsePlan(sqlText))
+ var plan = delegate.parsePlan(sqlText)
+ val sparkSession = PaimonSparkSession.active
+ parserRules(sparkSession).foreach(
+ rule => {
+ plan = rule.apply(plan)
+ })
+ plan
}
}
+ private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]]
= {
+ Seq(
+ RewritePaimonViewCommands(sparkSession),
+ RewritePaimonFunctionCommands(sparkSession)
+ )
+ }
+
/** Parses a string to an Expression. */
override def parseExpression(sqlText: String): Expression =
delegate.parseExpression(sqlText)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
new file mode 100644
index 0000000000..56c383f6d2
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.function.{Function => PaimonFunction}
+import org.apache.paimon.function.FunctionDefinition
+import org.apache.paimon.spark.SparkCatalog.FUNCTION_DEFINITION_NAME
+import org.apache.paimon.spark.catalog.SupportV1Function
+import org.apache.paimon.spark.catalog.functions.PaimonFunctions
+import org.apache.paimon.spark.execution.{CreatePaimonV1FunctionCommand,
DescribePaimonV1FunctionCommand, DropPaimonV1FunctionCommand}
+import org.apache.paimon.spark.util.OptionUtils
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry,
UnresolvedException, UnresolvedFunction, UnresolvedFunctionName,
UnresolvedIdentifier}
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction,
DescribeFunction, DropFunction, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern,
UNRESOLVED_FUNCTION}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
LookupCatalog}
+import org.apache.spark.sql.types.DataType
+
+case class RewritePaimonFunctionCommands(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with LookupCatalog {
+
+ protected lazy val catalogManager: CatalogManager =
spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Add a global switch to enable/disable v1 function.
+ if (!OptionUtils.v1FunctionEnabled()) {
+ return plan
+ }
+
+ plan.resolveOperatorsUp {
+ case CreateFunction(
+ CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function,
funcIdent),
+ className,
+ resources,
+ ifExists,
+ replace) =>
+ if (isPaimonBuildInFunction(funcIdent)) {
+ throw new UnsupportedOperationException(s"Can't create build-in
function: $funcIdent")
+ }
+ val v1Function = CatalogFunction(funcIdent, className, resources)
+ CreatePaimonV1FunctionCommand(v1FunctionCatalog, v1Function, ifExists,
replace)
+
+ case DropFunction(
+ CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function,
funcIdent),
+ ifExists) =>
+ if (isPaimonBuildInFunction(funcIdent)) {
+ throw new UnsupportedOperationException(s"Can't drop build-in
function: $funcIdent")
+ }
+ DropPaimonV1FunctionCommand(v1FunctionCatalog, funcIdent, ifExists)
+
+ case DescribeFunction(
+ CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function,
funcIdent),
+ isExtended)
+ // For Paimon built-in functions, Spark will resolve them by itself.
+ if !isPaimonBuildInFunction(funcIdent) =>
+ DescribePaimonV1FunctionCommand(v1FunctionCatalog, funcIdent,
isExtended)
+
+ // Needs to be done here and transform to `UnResolvedPaimonV1Function`,
so that spark's Analyzer can resolve
+ // the 'arguments' without throwing an exception, saying that function
is not supported.
+ case l: LogicalPlan =>
+
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
+ case u: UnresolvedFunction =>
+ CatalogAndFunctionIdentifier.unapply(u.nameParts) match {
+ case Some((v1FunctionCatalog: SupportV1Function, funcIdent))
+ // For Paimon built-in functions, Spark will resolve them by
itself.
+ if !isPaimonBuildInFunction(funcIdent) =>
+ // If the function is already registered, avoid redundant
lookup in the catalog to reduce overhead.
+ if (v1FunctionCatalog.v1FunctionRegistered(funcIdent)) {
+ UnResolvedPaimonV1Function(v1FunctionCatalog, funcIdent,
None, u.arguments)
+ } else {
+ val function = v1FunctionCatalog.getV1Function(funcIdent)
+ function.definition(FUNCTION_DEFINITION_NAME) match {
+ case _: FunctionDefinition.FileFunctionDefinition =>
+ if (u.isDistinct && u.filter.isDefined) {
+ throw new UnsupportedOperationException(
+ s"DISTINCT with FILTER is not supported, func name:
$funcIdent")
+ }
+ UnResolvedPaimonV1Function(
+ v1FunctionCatalog,
+ funcIdent,
+ Some(function),
+ u.arguments)
+ case _ => u
+ }
+ }
+ case _ => u
+ }
+ }
+ }
+ }
+
+ private object CatalogAndFunctionIdentifier {
+
+ def unapply(unresolved: LogicalPlan): Option[(CatalogPlugin,
FunctionIdentifier)] =
+ unresolved match {
+ case ui: UnresolvedIdentifier =>
+ unapply(ui.nameParts)
+ case name: UnresolvedFunctionName =>
+ unapply(name.multipartIdentifier)
+ case _ =>
+ None
+ }
+
+ def unapply(nameParts: Seq[String]): Option[(CatalogPlugin,
FunctionIdentifier)] = {
+ nameParts match {
+ // Spark's built-in functions is without database name or catalog name.
+ case Seq(funName) if
isSparkBuiltInFunction(FunctionIdentifier(funName)) =>
+ None
+ case CatalogAndIdentifier(v1FunctionCatalog: SupportV1Function, ident)
+ if v1FunctionCatalog.v1FunctionEnabled() =>
+ Some(v1FunctionCatalog, FunctionIdentifier(ident.name(),
Some(ident.namespace().last)))
+ case _ =>
+ None
+ }
+ }
+ }
+
+ private def isPaimonBuildInFunction(funcIdent: FunctionIdentifier): Boolean
= {
+ PaimonFunctions.names.contains(funcIdent.funcName)
+ }
+
+ private def isSparkBuiltInFunction(funcIdent: FunctionIdentifier): Boolean =
{
+ FunctionRegistry.builtin.functionExists(funcIdent)
+ }
+}
+
+/** An unresolved Paimon V1 function to let Spark resolve the necessary
variables. */
+case class UnResolvedPaimonV1Function(
+ v1FunctionCatalog: SupportV1Function,
+ funcIdent: FunctionIdentifier,
+ func: Option[PaimonFunction],
+ arguments: Seq[Expression])
+ extends Expression
+ with Unevaluable {
+
+ override def children: Seq[Expression] = arguments
+
+ override def dataType: DataType = throw new UnresolvedException("dataType")
+
+ override def nullable: Boolean = throw new UnresolvedException("nullable")
+
+ override lazy val resolved = false
+
+ final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_FUNCTION)
+
+ override def prettyName: String = funcIdent.identifier
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[Expression]): UnResolvedPaimonV1Function = {
+ copy(arguments = newChildren)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
index 21381cca29..d320def940 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/ClassicApi.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.paimon.shims
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.FunctionResourceLoader
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
@@ -42,4 +43,5 @@ trait ClassicApi {
relation: LogicalPlan,
columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat])
+ def sessionResourceLoader(session: SparkSession): FunctionResourceLoader
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/resources/function/hive-test-udfs.jar
b/paimon-spark/paimon-spark-ut/src/test/resources/function/hive-test-udfs.jar
new file mode 100644
index 0000000000..a5bfa456f6
Binary files /dev/null and
b/paimon-spark/paimon-spark-ut/src/test/resources/function/hive-test-udfs.jar
differ
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
similarity index 92%
rename from
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
rename to
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
index 124a089031..659e668709 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonRestCatalogSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestWithRestCatalogBase.scala
@@ -30,11 +30,11 @@ import org.assertj.core.api.Assertions
import java.util.UUID
-class PaimonRestCatalogSparkTestBase extends PaimonSparkTestBase {
+class PaimonSparkTestWithRestCatalogBase extends PaimonSparkTestBase {
- private var restCatalogServer: RESTCatalogServer = null
- private var serverUrl: String = null
- private var warehouse: String = null
+ private var restCatalogServer: RESTCatalogServer = _
+ private var serverUrl: String = _
+ private var warehouse: String = _
private val initToken = "init_token"
override protected def beforeAll(): Unit = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
index 1db9410a46..e0080f6f9c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/AlterViewDialectProcedureTest.scala
@@ -18,13 +18,14 @@
package org.apache.paimon.spark.procedure
-import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
import org.apache.paimon.spark.catalog.SupportView
import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions
-class AlterViewDialectProcedureTest extends PaimonRestCatalogSparkTestBase {
+class AlterViewDialectProcedureTest extends PaimonSparkTestWithRestCatalogBase
{
+
test(s"test alter view dialect procedure") {
val viewName = "view_test"
spark.sql(s"""
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
index b52022b89a..f3ab65bbd0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/FunctionProcedureTest.scala
@@ -18,13 +18,14 @@
package org.apache.paimon.spark.procedure
-import org.apache.paimon.spark.PaimonRestCatalogSparkTestBase
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.assertThat
/** Test for Function Procedure. */
-class FunctionProcedureTest extends PaimonRestCatalogSparkTestBase {
+class FunctionProcedureTest extends PaimonSparkTestWithRestCatalogBase {
+
test(s"test function procedure") {
val functionName = "function_test"
checkAnswer(
@@ -32,14 +33,13 @@ class FunctionProcedureTest extends
PaimonRestCatalogSparkTestBase {
"'[{\"id\": 0, \"name\":\"length\", \"type\":\"INT\"}, {\"id\": 1,
\"name\":\"width\", \"type\":\"INT\"}]'," +
" '[{\"id\": 0, \"name\":\"area\", \"type\":\"BIGINT\"}]', true,
'comment', 'k1=v1,k2=v2')"),
Row(true)
- );
+ )
assertThat(
spark
.sql("SHOW FUNCTIONS")
.collect()
.map(r => r.getString(0))
- .filter(v => v == s"paimon.test.$functionName")
- .size).isEqualTo(1)
+ .count(v => v == s"paimon.test.$functionName")).isEqualTo(1)
checkAnswer(
spark.sql(s"CALL sys.alter_function('$functionName', " +
"'{\"action\" : \"addDefinition\", \"name\" : \"spark\",
\"definition\" : {\"type\" : \"lambda\", \"definition\" : \"(Integer length,
Integer width) -> { return (long) length * width; }\", \"language\": \"JAVA\" }
}')"),
@@ -48,17 +48,16 @@ class FunctionProcedureTest extends
PaimonRestCatalogSparkTestBase {
checkAnswer(
spark.sql(s"select paimon.test.$functionName(1, 2)"),
Row(2)
- );
+ )
checkAnswer(
spark.sql(s"CALL sys.drop_function('$functionName')"),
Row(true)
- );
+ )
assertThat(
spark
.sql("SHOW FUNCTIONS")
.collect()
.map(r => r.getString(0))
- .filter(v => v == s"paimon.test.$functionName")
- .size).isEqualTo(0);
+ .count(v => v == s"paimon.test.$functionName")).isEqualTo(0);
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
new file mode 100644
index 0000000000..5974d9809a
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.spark.sql.FunctionResources._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+abstract class PaimonV1FunctionTestBase extends
PaimonSparkTestWithRestCatalogBase {
+
+ test("Paimon V1 Function: create or replace function") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT udf_add2(1, 2)"), Seq(Row(3)))
+
+ // create again should throw exception
+ intercept[Exception] {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ }
+
+ sql(s"""
+ |CREATE FUNCTION IF NOT EXISTS udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+
+ // create or replace
+ sql(s"""
+ |CREATE OR REPLACE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT udf_add2(3, 4)"), Seq(Row(7)))
+ }
+ }
+
+ test("Paimon V1 Function: use function with catalog name and database name")
{
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT udf_add2(3, 4)"), Seq(Row(7)))
+ sql("DROP FUNCTION udf_add2")
+
+ sql(s"""
+ |CREATE FUNCTION test.udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT test.udf_add2(3, 4)"), Seq(Row(7)))
+ sql("DROP FUNCTION test.udf_add2")
+
+ sql(s"""
+ |CREATE FUNCTION paimon.test.udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT paimon.test.udf_add2(3, 4)"), Seq(Row(7)))
+ sql("DROP FUNCTION paimon.test.udf_add2")
+ }
+ }
+
+ test("Paimon V1 Function: select with attribute") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT)")
+ sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+ checkAnswer(sql("SELECT udf_add2(a, b) FROM t"), Seq(Row(3), Row(7)))
+ }
+ }
+ }
+
+ test("Paimon V1 Function: select with build-in function") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, pt INT) USING paimon PARTITIONED BY (pt)")
+ sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+ checkAnswer(
+ sql(
+ "SELECT a, udf_add2(pow(a, pt), max_pt('t')), pow(a, udf_add2(a,
pt)) FROM t ORDER BY a"),
+ Seq(Row(1, 5.0d, 1.0d), Row(3, 85.0d, 2187.0d))
+ )
+ }
+ }
+ }
+
+ test("Paimon V1 Function: drop function") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ checkAnswer(sql("SELECT udf_add2(1, 2)"), Seq(Row(3)))
+
+ sql("DROP FUNCTION udf_add2")
+ intercept[Exception] {
+ sql("SELECT udf_add2(3, 4)")
+ }
+ sql("DROP FUNCTION IF EXISTS udf_add2")
+ }
+ }
+
+ test("Paimon V1 Function: describe function") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+
+ checkAnswer(
+ sql("DESCRIBE FUNCTION udf_add2"),
+ Seq(Row("Function: test.udf_add2"), Row(s"Class:
$UDFExampleAdd2Class"))
+ )
+ }
+ }
+
+ test("Paimon V1 Function: unsupported operation") {
+ // create a build-in function
+ intercept[Exception] {
+ sql(s"""
+ |CREATE FUNCTION max_pt AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ }
+
+ // drop a build-in function
+ intercept[Exception] {
+ sql("DROP FUNCTION max_pt")
+ }
+ }
+}
+
+class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.catalog.paimon.v1Function.enabled", "false")
+ }
+
+ test("Paimon V1 Function: disable paimon v1 function") {
+ intercept[Exception] {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ }
+ }
+}
+
+object FunctionResources {
+
+ val testUDFJarPath: String =
+ getClass.getClassLoader.getResource("function/hive-test-udfs.jar").getPath
+
+ val UDFExampleAdd2Class: String =
"org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd2"
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
index b0782c59d6..e0ff2caf06 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic3Api.scala
@@ -19,10 +19,12 @@
package org.apache.spark.sql.paimon.shims
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.FunctionResourceLoader
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionResourceLoader
class Classic3Api extends ClassicApi {
override def column(expression: Expression): Column = new Column(expression)
@@ -52,4 +54,7 @@ class Classic3Api extends ClassicApi {
columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) =
CommandUtils.computeColumnStats(spark, relation, columns)
+ override def sessionResourceLoader(session: SparkSession):
FunctionResourceLoader = {
+ new SessionResourceLoader(session)
+ }
}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
index 8dff78f0bc..860ecc28d3 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Classic4Api.scala
@@ -19,11 +19,13 @@
package org.apache.spark.sql.paimon.shims
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.FunctionResourceLoader
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
import org.apache.spark.sql.classic.{ClassicConversions, Dataset =>
ClassicDataset, ExpressionUtils}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionResourceLoader
/**
* This class is used to implement the conversion from sql-api to classic one.
Make sure this is the
@@ -60,4 +62,8 @@ class Classic4Api extends ClassicApi with ClassicConversions {
columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
CommandUtils.computeColumnStats(spark, relation, columns.toSeq)
}
+
+ override def sessionResourceLoader(session: SparkSession):
FunctionResourceLoader = {
+ new SessionResourceLoader(session)
+ }
}
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index 49823bf4b5..a546d8d889 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -115,6 +115,21 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>