This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbed67726 [spark] System function max_pt should be used as
'sys.max_pt' (#6312)
7bbed67726 is described below
commit 7bbed677261756140653df0f810b24c8297575db
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 24 00:37:04 2025 +0800
[spark] System function max_pt should be used as 'sys.max_pt' (#6312)
---
docs/content/spark/sql-functions.md | 9 ++---
.../java/org/apache/paimon/spark/SparkCatalog.java | 42 +++++++++++----------
.../paimon/spark/catalog/SparkBaseCatalog.java | 10 +++--
.../paimon/spark/SparkCatalogWithRestTest.java | 3 ++
.../catalog/functions/BucketFunctionTest.java | 4 +-
.../spark/benchmark/BucketFunctionBenchmark.scala | 44 ----------------------
.../paimon/spark/sql/PaimonFunctionTest.scala | 6 +--
.../spark/sql/PaimonV1FunctionTestBase.scala | 2 +-
8 files changed, 41 insertions(+), 79 deletions(-)
diff --git a/docs/content/spark/sql-functions.md
b/docs/content/spark/sql-functions.md
index 65019ac5fd..99f4dcb153 100644
--- a/docs/content/spark/sql-functions.md
+++ b/docs/content/spark/sql-functions.md
@@ -28,12 +28,11 @@ under the License.
This section introduce all available Paimon Spark functions.
-
## Built-in Function
### max_pt
-`max_pt($table_name)`
+`sys.max_pt($table_name)`
It accepts a string type literal to specify the table name and return a
max-valid-toplevel partition value.
- **valid**: the partition which contains data files
@@ -47,15 +46,13 @@ It would throw exception when:
**Example**
```sql
-SELECT max_pt('t');
+SELECT sys.max_pt('t');
-- 20250101
-SELECT * FROM t where pt = max_pt('t');
+SELECT * FROM t where pt = sys.max_pt('t');
-- a, 20250101
```
-**Since: 1.1.0**
-
## User-defined Function
Paimon Spark supports two types of user-defined functions: lambda functions
and file-based functions.
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 1df1d66f80..db7281de92 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -524,34 +524,33 @@ public class SparkCatalog extends SparkBaseCatalog
@Override
public Identifier[] listFunctions(String[] namespace) throws
NoSuchNamespaceException {
- if (isFunctionNamespace(namespace)) {
- List<Identifier> functionIdentifiers = new ArrayList<>();
- PaimonFunctions.names()
- .forEach(name ->
functionIdentifiers.add(Identifier.of(namespace, name)));
- if (namespace.length > 0) {
- String databaseName = getDatabaseNameFromNamespace(namespace);
- try {
- catalog.listFunctions(databaseName)
- .forEach(
- name ->
- functionIdentifiers.add(
- Identifier.of(namespace,
name)));
- } catch (Catalog.DatabaseNotExistException e) {
- throw new NoSuchNamespaceException(namespace);
- }
+ if (isSystemFunctionNamespace(namespace)) {
+ List<Identifier> result = new ArrayList<>();
+ PaimonFunctions.names().forEach(name ->
result.add(Identifier.of(namespace, name)));
+ return result.toArray(new Identifier[0]);
+ } else if (isDatabaseFunctionNamespace(namespace)) {
+ List<Identifier> result = new ArrayList<>();
+ String databaseName = getDatabaseNameFromNamespace(namespace);
+ try {
+ catalog.listFunctions(databaseName)
+ .forEach(name -> result.add(Identifier.of(namespace,
name)));
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
}
- return functionIdentifiers.toArray(new Identifier[0]);
+ return result.toArray(new Identifier[0]);
}
throw new NoSuchNamespaceException(namespace);
}
@Override
public UnboundFunction loadFunction(Identifier ident) throws
NoSuchFunctionException {
- if (isFunctionNamespace(ident.namespace())) {
+ String[] namespace = ident.namespace();
+ if (isSystemFunctionNamespace(namespace)) {
UnboundFunction func = PaimonFunctions.load(ident.name());
if (func != null) {
return func;
}
+ } else if (isDatabaseFunctionNamespace(namespace)) {
try {
Function paimonFunction =
catalog.getFunction(toIdentifier(ident));
FunctionDefinition functionDefinition =
@@ -582,11 +581,14 @@ public class SparkCatalog extends SparkBaseCatalog
throw new NoSuchFunctionException(ident);
}
- private boolean isFunctionNamespace(String[] namespace) {
+ private boolean isSystemFunctionNamespace(String[] namespace) {
// Allow for empty namespace, as Spark's bucket join will use `bucket`
function with empty
// namespace to generate transforms for partitioning.
- // Otherwise, check if it is paimon namespace.
- return namespace.length == 0 || (namespace.length == 1 &&
namespaceExists(namespace));
+ return namespace.length == 0 || isSystemNamespace(namespace);
+ }
+
+ private boolean isDatabaseFunctionNamespace(String[] namespace) {
+ return namespace.length == 1 && namespaceExists(namespace);
}
private PaimonV1FunctionRegistry v1FunctionRegistry() {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
index 1cb3035fad..ac6736e2e1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SparkBaseCatalog.java
@@ -18,7 +18,6 @@
package org.apache.paimon.spark.catalog;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.spark.SparkProcedures;
import org.apache.paimon.spark.SparkSource;
import org.apache.paimon.spark.analysis.NoSuchProcedureException;
@@ -35,6 +34,7 @@ import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Set;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static
org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE;
/** Spark base catalog. */
@@ -54,7 +54,7 @@ public abstract class SparkBaseCatalog
@Override
public Procedure loadProcedure(Identifier identifier) throws
NoSuchProcedureException {
- if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) {
+ if (isSystemNamespace(identifier.namespace())) {
ProcedureBuilder builder =
SparkProcedures.newBuilder(identifier.name());
if (builder != null) {
return builder.withTableCatalog(this).build();
@@ -63,7 +63,11 @@ public abstract class SparkBaseCatalog
throw new NoSuchProcedureException(identifier);
}
- public boolean usePaimon(@Nullable String provider) {
+ public static boolean usePaimon(@Nullable String provider) {
return provider == null ||
SparkSource.NAME().equalsIgnoreCase(provider);
}
+
+ public static boolean isSystemNamespace(String[] namespace) {
+ return namespace.length == 1 &&
namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME);
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index 575eb72711..ee8978c687 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -165,6 +165,9 @@ public class SparkCatalogWithRestTest {
.get(0)
.toString())
.isEqualTo("[3]");
+ assertThat(spark.sql("show user functions").collectAsList().toString())
+ .contains("[paimon.db2.area_func]");
+
paimonCatalog.dropFunction(identifier, false);
cleanFunction(functionName);
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
index b8fbcdae42..214965bf15 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/catalog/functions/BucketFunctionTest.java
@@ -237,7 +237,7 @@ public class BucketFunctionTest {
setupTable(bucketColumns);
spark.sql(
String.format(
- "SELECT id_col, __paimon_bucket as
expected_bucket, paimon.bucket(%s, %s) FROM %s",
+ "SELECT id_col, __paimon_bucket as
expected_bucket, paimon.sys.bucket(%s, %s) FROM %s",
NUM_BUCKETS, String.join(",", bucketColumns),
TABLE_NAME))
.collectAsList()
.forEach(row ->
Assertions.assertThat(row.getInt(2)).isEqualTo(row.get(1)));
@@ -328,7 +328,7 @@ public class BucketFunctionTest {
setupTable(TIMESTAMP_COL_PRECISION_3);
spark.sql(
String.format(
- "SELECT id_col, __paimon_bucket as
expected_bucket, paimon.bucket(%s, %s) FROM %s",
+ "SELECT id_col, __paimon_bucket as
expected_bucket, paimon.sys.bucket(%s, %s) FROM %s",
NUM_BUCKETS,
String.join(",", TIMESTAMP_COL_PRECISION_3),
TABLE_NAME))
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
deleted file mode 100644
index 1ba618c6a1..0000000000
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/benchmark/BucketFunctionBenchmark.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.benchmark
-
-import org.apache.spark.sql.paimon.PaimonBenchmark
-
-object BucketFunctionBenchmark extends PaimonSqlBasedBenchmark {
-
- private val N = 20L * 1000 * 1000
-
- override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
- val benchmark = PaimonBenchmark(s"Bucket function", N, output = output)
-
- benchmark.addCase("Single int column", 3) {
- _ => spark.range(N).selectExpr("fixed_bucket(10, id)").noop()
- }
-
- benchmark.addCase("Single string column", 3) {
- _ => spark.range(N).selectExpr("fixed_bucket(10, uuid())").noop()
- }
-
- benchmark.addCase("Multiple columns", 3) {
- _ => spark.range(N).selectExpr("fixed_bucket(10, id, uuid(),
uuid())").noop()
- }
-
- benchmark.run()
- }
-}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
index b1263cd8d2..765eb136bb 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
@@ -69,7 +69,7 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
Seq("paimon", paimonHiveCatalogName).foreach {
catalogName =>
sql(s"use $catalogName")
- val functions = sql("show user functions").collect()
+ val functions = sql("show user functions in sys").collect()
assert(functions.exists(_.getString(0).contains("max_pt")),
catalogName)
}
}
@@ -120,9 +120,9 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
{
sql(s"use $catalogName")
val maxPt = if (catalogName == sparkCatalogName) {
- "paimon.max_pt"
+ "paimon.sys.max_pt"
} else {
- "max_pt"
+ "sys.max_pt"
}
intercept[Exception] {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 743b36f8dd..eec9a1acb7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -106,7 +106,7 @@ abstract class PaimonV1FunctionTestBase extends
PaimonSparkTestWithRestCatalogBa
sql("INSERT INTO t VALUES (1, 2), (3, 4)")
checkAnswer(
sql(
- "SELECT a, udf_add2(pow(a, pt), max_pt('t')), pow(a, udf_add2(a,
pt)) FROM t ORDER BY a"),
+ "SELECT a, udf_add2(pow(a, pt), sys.max_pt('t')), pow(a,
udf_add2(a, pt)) FROM t ORDER BY a"),
Seq(Row(1, 5.0d, 1.0d), Row(3, 85.0d, 2187.0d))
)
}