This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6f4ad34fd7 [spark] Add max_pt function (#5088)
6f4ad34fd7 is described below
commit 6f4ad34fd7e4e53ea6964e0382c8eb3371a2116e
Author: Xiduo You <[email protected]>
AuthorDate: Sun Feb 16 21:21:05 2025 +0800
[spark] Add max_pt function (#5088)
---
docs/content/spark/sql-functions.md | 55 +++++++++++++
.../paimon/spark/catalog/SupportFunction.java | 15 +---
.../spark/catalog/functions/PaimonFunctions.java | 63 ++++++++++++++-
.../paimon/spark/PaimonPartitionManagement.scala | 2 +-
.../catalyst/analysis/ReplacePaimonFunctions.scala | 94 ++++++++++++++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 3 +-
.../paimon/spark/sql/PaimonFunctionTest.scala | 65 +++++++++++++++
7 files changed, 283 insertions(+), 14 deletions(-)
diff --git a/docs/content/spark/sql-functions.md
b/docs/content/spark/sql-functions.md
new file mode 100644
index 0000000000..182f6be518
--- /dev/null
+++ b/docs/content/spark/sql-functions.md
@@ -0,0 +1,55 @@
+---
+title: "SQL Functions"
+weight: 2
+type: docs
+aliases:
+- /spark/sql-functions.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# SQL Functions
+
+This section introduce all available Paimon Spark functions.
+
+
+## max_pt
+
+`max_pt($table_name)`
+
+It accepts a string type literal to specify the table name and return a
max-valid-toplevel partition value.
+- **valid**: the partition which contains data files
+- **toplevel**: only return the first partition value if the table has
multi-partition columns
+
+It would throw exception when:
+- the table is not a partitioned table
+- the partitioned table does not have partition
+- all of the partitions do not contains data files
+
+**Example**
+
+```sql
+> SELECT max_pt('t');
+ 20250101
+
+> SELECT * FROM t where pt = max_pt('t');
+ a, 20250101
+```
+
+**Since: 1.1.0**
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
index bb16bbf1c7..d3fc3528bb 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
@@ -27,19 +27,14 @@ import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
-import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
-
/** Catalog methods for working with Functions. */
public interface SupportFunction extends FunctionCatalog, SupportsNamespaces {
- static boolean isFunctionNamespace(String[] namespace) {
+ default boolean isFunctionNamespace(String[] namespace) {
// Allow for empty namespace, as Spark's bucket join will use `bucket`
function with empty
- // namespace to generate transforms for partitioning. Otherwise, use
`sys` namespace.
- return namespace.length == 0 || isSystemNamespace(namespace);
- }
-
- static boolean isSystemNamespace(String[] namespace) {
- return namespace.length == 1 &&
namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME);
+ // namespace to generate transforms for partitioning.
+ // Otherwise, check if it is paimon namespace.
+ return namespace.length == 0 || (namespace.length == 1 &&
namespaceExists(namespace));
}
@Override
@@ -48,8 +43,6 @@ public interface SupportFunction extends FunctionCatalog,
SupportsNamespaces {
return PaimonFunctions.names().stream()
.map(name -> Identifier.of(namespace, name))
.toArray(Identifier[]::new);
- } else if (namespaceExists(namespace)) {
- return new Identifier[0];
}
throw new NoSuchNamespaceException(namespace);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
index 46be3f2fa5..c7949e1194 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
@@ -21,7 +21,9 @@ package org.apache.paimon.spark.catalog.functions;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
@@ -34,12 +36,15 @@ import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
/** Paimon functions. */
public class PaimonFunctions {
private static final Map<String, UnboundFunction> FUNCTIONS =
- ImmutableMap.of("bucket", new PaimonFunctions.BucketFunction());
+ ImmutableMap.of(
+ "bucket", new BucketFunction(),
+ "max_pt", new MaxPtFunction());
private static final List<String> FUNCTION_NAMES =
ImmutableList.copyOf(FUNCTIONS.keySet());
@@ -105,4 +110,60 @@ public class PaimonFunctions {
return "bucket";
}
}
+
+ /**
+ * For partitioned tables, this function returns the maximum value of the
first level partition
+ * of the partitioned table, sorted alphabetically. Note, empty partitions
will be skipped. For
+ * example, a partition created by `alter table ... add partition ...`.
+ */
+ public static class MaxPtFunction implements UnboundFunction {
+ @Override
+ public BoundFunction bind(StructType inputType) {
+ if (inputType.fields().length != 1) {
+ throw new UnsupportedOperationException(
+ "Wrong number of inputs, expected 1 but got " +
inputType.fields().length);
+ }
+ StructField identifier = inputType.fields()[0];
+ checkArgument(identifier.dataType() == StringType, "table name
must be string type");
+
+ return new ScalarFunction<String>() {
+ @Override
+ public DataType[] inputTypes() {
+ return new DataType[] {identifier.dataType()};
+ }
+
+ @Override
+ public DataType resultType() {
+ return StringType;
+ }
+
+ @Override
+ public String produceResult(InternalRow input) {
+ // Does not need to implement the `produceResult` method,
+ // since `ReplacePaimonFunctions` will replace it with
partition literal.
+ throw new IllegalStateException("This method should not be
called");
+ }
+
+ @Override
+ public String name() {
+ return "max_pt";
+ }
+
+ @Override
+ public String canonicalName() {
+ return "paimon.max_pt(" +
identifier.dataType().catalogString() + ")";
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return name();
+ }
+
+ @Override
+ public String name() {
+ return "max_pt";
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 5a6abfe284..1817add879 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
self: SparkTable =>
- private lazy val partitionRowType: RowType =
TypeUtils.project(table.rowType, table.partitionKeys)
+ lazy val partitionRowType: RowType = TypeUtils.project(table.rowType,
table.partitionKeys)
override lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(partitionRowType)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
new file mode 100644
index 0000000000..d3650d27f8
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.{DataConverter, SparkTable, SparkTypeUtils,
SparkUtils}
+import org.apache.paimon.spark.catalog.SparkBaseCatalog
+import org.apache.paimon.utils.{InternalRowUtils, TypeUtils}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression,
Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.jdk.CollectionConverters._
+
+/** A rule to replace Paimon functions with literal values. */
+case class ReplacePaimonFunctions(spark: SparkSession) extends
Rule[LogicalPlan] {
+ private def replaceMaxPt(func: ApplyFunctionExpression): Expression = {
+ assert(func.children.size == 1)
+ assert(func.children.head.dataType == StringType)
+ if (!func.children.head.isInstanceOf[Literal]) {
+ throw new UnsupportedOperationException("Table name must be a literal")
+ }
+ val tableName = func.children.head.eval().asInstanceOf[UTF8String]
+ if (tableName == null) {
+ throw new UnsupportedOperationException("Table name cannot be null")
+ }
+ val catalogAndIdentifier = SparkUtils
+ .catalogAndIdentifier(
+ spark,
+ tableName.toString,
+ spark.sessionState.catalogManager.currentCatalog)
+ if (!catalogAndIdentifier.catalog().isInstanceOf[SparkBaseCatalog]) {
+ throw new UnsupportedOperationException(
+ s"${catalogAndIdentifier.catalog()} is not a Paimon catalog")
+ }
+
+ val table =
+
catalogAndIdentifier.catalog.asTableCatalog.loadTable(catalogAndIdentifier.identifier())
+ assert(table.isInstanceOf[SparkTable])
+ val sparkTable = table.asInstanceOf[SparkTable]
+ if (sparkTable.table.partitionKeys().size() == 0) {
+ throw new UnsupportedOperationException(s"$table is not a partitioned
table")
+ }
+
+ val toplevelPartitionType =
+ TypeUtils.project(sparkTable.table.rowType,
sparkTable.table.partitionKeys()).getTypeAt(0)
+ val partitionValues = sparkTable.table.newReadBuilder.newScan
+ .listPartitionEntries()
+ .asScala
+ .filter(_.fileCount() > 0)
+ .map {
+ partitionEntry => InternalRowUtils.get(partitionEntry.partition(), 0,
toplevelPartitionType)
+ }
+ .sortWith(InternalRowUtils.compare(_, _,
toplevelPartitionType.getTypeRoot) < 0)
+ .map(DataConverter.fromPaimon(_, toplevelPartitionType))
+ if (partitionValues.isEmpty) {
+ throw new UnsupportedOperationException(
+ s"$table has no partitions or none of the partitions have any data")
+ }
+
+ val sparkType = SparkTypeUtils.fromPaimonType(toplevelPartitionType)
+ val literal = Literal(partitionValues.last, sparkType)
+ Cast(literal, func.dataType)
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveExpressions {
+ case func: ApplyFunctionExpression
+ if func.function.name() == "max_pt" &&
+ func.function.canonicalName().startsWith("paimon") =>
+ replaceMaxPt(func)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index f73df64fb8..bfd337580d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.extensions
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable,
PaimonViewResolver}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable,
PaimonViewResolver, ReplacePaimonFunctions}
import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueries}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.execution.PaimonStrategy
@@ -44,6 +44,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
spark => SparkShimLoader.getSparkShim.createCustomResolution(spark))
extensions.injectResolutionRule(spark =>
PaimonIncompatibleResolutionRules(spark))
+ extensions.injectPostHocResolutionRule(spark =>
ReplacePaimonFunctions(spark))
extensions.injectPostHocResolutionRule(spark =>
PaimonPostHocResolutionRules(spark))
extensions.injectPostHocResolutionRule(spark =>
PaimonIncompatiblePHRRules(spark))
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
index bb48983a85..4b0e63731d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
@@ -66,6 +66,16 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
}
}
+ test("Paimon function: show user functions") {
+ assume(gteqSpark3_4)
+ Seq("paimon", paimonHiveCatalogName).foreach {
+ catalogName =>
+ sql(s"use $catalogName")
+ val functions = sql("show user functions").collect()
+ assert(functions.exists(_.getString(0).contains("max_pt")),
catalogName)
+ }
+ }
+
test("Paimon function: bucket join with SparkGenericCatalog") {
sql(s"use $sparkCatalogName")
assume(gteqSpark3_3)
@@ -105,6 +115,61 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
sql("DROP FUNCTION myIntSum")
checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"),
Seq.empty)
}
+
+ test("Add max_pt function") {
+ Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
+ catalogName =>
+ {
+ sql(s"use $catalogName")
+ val maxPt = if (catalogName == sparkCatalogName) {
+ "paimon.max_pt"
+ } else {
+ "max_pt"
+ }
+
+ intercept[Exception] {
+ sql(s"SELECT $maxPt(1)").collect()
+ }
+ intercept[Exception] {
+ sql(s"SELECT $maxPt()").collect()
+ }
+ withTable("t") {
+ sql("CREATE TABLE t (id INT) USING paimon")
+ intercept[Exception] {
+ sql(s"SELECT $maxPt('t')").collect()
+ }
+ }
+
+ withTable("t") {
+ sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1
STRING)")
+ intercept[Exception] {
+ sql(s"SELECT $maxPt('t')").collect()
+ }
+ sql("INSERT INTO t PARTITION (p1='a') VALUES (1)")
+ sql("INSERT INTO t PARTITION (p1='b') VALUES (2)")
+ sql("INSERT INTO t PARTITION (p1='aa') VALUES (3)")
+ sql("ALTER TABLE t ADD PARTITION (p1='z')")
+ checkAnswer(sql(s"SELECT $maxPt('t')"), Row("b"))
+ checkAnswer(sql(s"SELECT id FROM t WHERE p1 =
$maxPt('default.t')"), Row(2))
+ }
+
+ withTable("t") {
+ sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 INT,
p2 STRING)")
+ intercept[Exception] {
+ sql(s"SELECT $maxPt('t')").collect()
+ }
+ sql("INSERT INTO t PARTITION (p1=1, p2='c') VALUES (1)")
+ sql("INSERT INTO t PARTITION (p1=2, p2='a') VALUES (2)")
+ sql("INSERT INTO t PARTITION (p1=2, p2='b') VALUES (3)")
+ sql("ALTER TABLE t ADD PARTITION (p1='9', p2='z')")
+ checkAnswer(sql(s"SELECT $maxPt('t')"), Row("2"))
+ checkAnswer(
+ sql(s"SELECT id FROM t WHERE p1 = $maxPt('default.t')"),
+ Row(2) :: Row(3) :: Nil)
+ }
+ }
+ }
+ }
}
private class MyIntSum extends UserDefinedAggregateFunction {