This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f4ad34fd7 [spark] Add max_pt function (#5088)
6f4ad34fd7 is described below

commit 6f4ad34fd7e4e53ea6964e0382c8eb3371a2116e
Author: Xiduo You <[email protected]>
AuthorDate: Sun Feb 16 21:21:05 2025 +0800

    [spark] Add max_pt function (#5088)
---
 docs/content/spark/sql-functions.md                | 55 +++++++++++++
 .../paimon/spark/catalog/SupportFunction.java      | 15 +---
 .../spark/catalog/functions/PaimonFunctions.java   | 63 ++++++++++++++-
 .../paimon/spark/PaimonPartitionManagement.scala   |  2 +-
 .../catalyst/analysis/ReplacePaimonFunctions.scala | 94 ++++++++++++++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |  3 +-
 .../paimon/spark/sql/PaimonFunctionTest.scala      | 65 +++++++++++++++
 7 files changed, 283 insertions(+), 14 deletions(-)

diff --git a/docs/content/spark/sql-functions.md 
b/docs/content/spark/sql-functions.md
new file mode 100644
index 0000000000..182f6be518
--- /dev/null
+++ b/docs/content/spark/sql-functions.md
@@ -0,0 +1,55 @@
+---
+title: "SQL Functions"
+weight: 2
+type: docs
+aliases:
+- /spark/sql-functions.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# SQL Functions
+
+This section introduce all available Paimon Spark functions.
+
+
+## max_pt
+
+`max_pt($table_name)`
+
+It accepts a string type literal to specify the table name and return a 
max-valid-toplevel partition value.
+- **valid**: the partition which contains data files
+- **toplevel**: only return the first partition value if the table has 
multi-partition columns
+
+It would throw exception when:
+- the table is not a partitioned table
+- the partitioned table does not have partition
+- all of the partitions do not contains data files
+
+**Example**
+
+```sql
+> SELECT max_pt('t');
+ 20250101
+ 
+> SELECT * FROM t where pt = max_pt('t');
+ a, 20250101
+```
+
+**Since: 1.1.0**
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
index bb16bbf1c7..d3fc3528bb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java
@@ -27,19 +27,14 @@ import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 
-import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
-
 /** Catalog methods for working with Functions. */
 public interface SupportFunction extends FunctionCatalog, SupportsNamespaces {
 
-    static boolean isFunctionNamespace(String[] namespace) {
+    default boolean isFunctionNamespace(String[] namespace) {
         // Allow for empty namespace, as Spark's bucket join will use `bucket` 
function with empty
-        // namespace to generate transforms for partitioning. Otherwise, use 
`sys` namespace.
-        return namespace.length == 0 || isSystemNamespace(namespace);
-    }
-
-    static boolean isSystemNamespace(String[] namespace) {
-        return namespace.length == 1 && 
namespace[0].equalsIgnoreCase(SYSTEM_DATABASE_NAME);
+        // namespace to generate transforms for partitioning.
+        // Otherwise, check if it is paimon namespace.
+        return namespace.length == 0 || (namespace.length == 1 && 
namespaceExists(namespace));
     }
 
     @Override
@@ -48,8 +43,6 @@ public interface SupportFunction extends FunctionCatalog, 
SupportsNamespaces {
             return PaimonFunctions.names().stream()
                     .map(name -> Identifier.of(namespace, name))
                     .toArray(Identifier[]::new);
-        } else if (namespaceExists(namespace)) {
-            return new Identifier[0];
         }
 
         throw new NoSuchNamespaceException(namespace);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
index 46be3f2fa5..c7949e1194 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/functions/PaimonFunctions.java
@@ -21,7 +21,9 @@ package org.apache.paimon.spark.catalog.functions;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
+import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructField;
@@ -34,12 +36,15 @@ import java.util.Map;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /** Paimon functions. */
 public class PaimonFunctions {
 
     private static final Map<String, UnboundFunction> FUNCTIONS =
-            ImmutableMap.of("bucket", new PaimonFunctions.BucketFunction());
+            ImmutableMap.of(
+                    "bucket", new BucketFunction(),
+                    "max_pt", new MaxPtFunction());
 
     private static final List<String> FUNCTION_NAMES = 
ImmutableList.copyOf(FUNCTIONS.keySet());
 
@@ -105,4 +110,60 @@ public class PaimonFunctions {
             return "bucket";
         }
     }
+
+    /**
+     * For partitioned tables, this function returns the maximum value of the 
first level partition
+     * of the partitioned table, sorted alphabetically. Note, empty partitions 
will be skipped. For
+     * example, a partition created by `alter table ... add partition ...`.
+     */
+    public static class MaxPtFunction implements UnboundFunction {
+        @Override
+        public BoundFunction bind(StructType inputType) {
+            if (inputType.fields().length != 1) {
+                throw new UnsupportedOperationException(
+                        "Wrong number of inputs, expected 1 but got " + 
inputType.fields().length);
+            }
+            StructField identifier = inputType.fields()[0];
+            checkArgument(identifier.dataType() == StringType, "table name 
must be string type");
+
+            return new ScalarFunction<String>() {
+                @Override
+                public DataType[] inputTypes() {
+                    return new DataType[] {identifier.dataType()};
+                }
+
+                @Override
+                public DataType resultType() {
+                    return StringType;
+                }
+
+                @Override
+                public String produceResult(InternalRow input) {
+                    // Does not need to implement the `produceResult` method,
+                    // since `ReplacePaimonFunctions` will replace it with 
partition literal.
+                    throw new IllegalStateException("This method should not be 
called");
+                }
+
+                @Override
+                public String name() {
+                    return "max_pt";
+                }
+
+                @Override
+                public String canonicalName() {
+                    return "paimon.max_pt(" + 
identifier.dataType().catalogString() + ")";
+                }
+            };
+        }
+
+        @Override
+        public String description() {
+            return name();
+        }
+
+        @Override
+        public String name() {
+            return "max_pt";
+        }
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 5a6abfe284..1817add879 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
 trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
   self: SparkTable =>
 
-  private lazy val partitionRowType: RowType = 
TypeUtils.project(table.rowType, table.partitionKeys)
+  lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, 
table.partitionKeys)
 
   override lazy val partitionSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(partitionRowType)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
new file mode 100644
index 0000000000..d3650d27f8
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/ReplacePaimonFunctions.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.{DataConverter, SparkTable, SparkTypeUtils, 
SparkUtils}
+import org.apache.paimon.spark.catalog.SparkBaseCatalog
+import org.apache.paimon.utils.{InternalRowUtils, TypeUtils}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{ApplyFunctionExpression, 
Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.jdk.CollectionConverters._
+
+/** A rule to replace Paimon functions with literal values. */
+case class ReplacePaimonFunctions(spark: SparkSession) extends 
Rule[LogicalPlan] {
+  private def replaceMaxPt(func: ApplyFunctionExpression): Expression = {
+    assert(func.children.size == 1)
+    assert(func.children.head.dataType == StringType)
+    if (!func.children.head.isInstanceOf[Literal]) {
+      throw new UnsupportedOperationException("Table name must be a literal")
+    }
+    val tableName = func.children.head.eval().asInstanceOf[UTF8String]
+    if (tableName == null) {
+      throw new UnsupportedOperationException("Table name cannot be null")
+    }
+    val catalogAndIdentifier = SparkUtils
+      .catalogAndIdentifier(
+        spark,
+        tableName.toString,
+        spark.sessionState.catalogManager.currentCatalog)
+    if (!catalogAndIdentifier.catalog().isInstanceOf[SparkBaseCatalog]) {
+      throw new UnsupportedOperationException(
+        s"${catalogAndIdentifier.catalog()} is not a Paimon catalog")
+    }
+
+    val table =
+      
catalogAndIdentifier.catalog.asTableCatalog.loadTable(catalogAndIdentifier.identifier())
+    assert(table.isInstanceOf[SparkTable])
+    val sparkTable = table.asInstanceOf[SparkTable]
+    if (sparkTable.table.partitionKeys().size() == 0) {
+      throw new UnsupportedOperationException(s"$table is not a partitioned 
table")
+    }
+
+    val toplevelPartitionType =
+      TypeUtils.project(sparkTable.table.rowType, 
sparkTable.table.partitionKeys()).getTypeAt(0)
+    val partitionValues = sparkTable.table.newReadBuilder.newScan
+      .listPartitionEntries()
+      .asScala
+      .filter(_.fileCount() > 0)
+      .map {
+        partitionEntry => InternalRowUtils.get(partitionEntry.partition(), 0, 
toplevelPartitionType)
+      }
+      .sortWith(InternalRowUtils.compare(_, _, 
toplevelPartitionType.getTypeRoot) < 0)
+      .map(DataConverter.fromPaimon(_, toplevelPartitionType))
+    if (partitionValues.isEmpty) {
+      throw new UnsupportedOperationException(
+        s"$table has no partitions or none of the partitions have any data")
+    }
+
+    val sparkType = SparkTypeUtils.fromPaimonType(toplevelPartitionType)
+    val literal = Literal(partitionValues.last, sparkType)
+    Cast(literal, func.dataType)
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveExpressions {
+      case func: ApplyFunctionExpression
+          if func.function.name() == "max_pt" &&
+            func.function.canonicalName().startsWith("paimon") =>
+        replaceMaxPt(func)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index f73df64fb8..bfd337580d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.extensions
 
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, 
PaimonViewResolver}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, 
PaimonViewResolver, ReplacePaimonFunctions}
 import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueries}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.execution.PaimonStrategy
@@ -44,6 +44,7 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
       spark => SparkShimLoader.getSparkShim.createCustomResolution(spark))
     extensions.injectResolutionRule(spark => 
PaimonIncompatibleResolutionRules(spark))
 
+    extensions.injectPostHocResolutionRule(spark => 
ReplacePaimonFunctions(spark))
     extensions.injectPostHocResolutionRule(spark => 
PaimonPostHocResolutionRules(spark))
     extensions.injectPostHocResolutionRule(spark => 
PaimonIncompatiblePHRRules(spark))
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
index bb48983a85..4b0e63731d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala
@@ -66,6 +66,16 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
     }
   }
 
+  test("Paimon function: show user functions") {
+    assume(gteqSpark3_4)
+    Seq("paimon", paimonHiveCatalogName).foreach {
+      catalogName =>
+        sql(s"use $catalogName")
+        val functions = sql("show user functions").collect()
+        assert(functions.exists(_.getString(0).contains("max_pt")), 
catalogName)
+    }
+  }
+
   test("Paimon function: bucket join with SparkGenericCatalog") {
     sql(s"use $sparkCatalogName")
     assume(gteqSpark3_3)
@@ -105,6 +115,61 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
     sql("DROP FUNCTION myIntSum")
     checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), 
Seq.empty)
   }
+
+  test("Add max_pt function") {
+    Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        {
+          sql(s"use $catalogName")
+          val maxPt = if (catalogName == sparkCatalogName) {
+            "paimon.max_pt"
+          } else {
+            "max_pt"
+          }
+
+          intercept[Exception] {
+            sql(s"SELECT $maxPt(1)").collect()
+          }
+          intercept[Exception] {
+            sql(s"SELECT $maxPt()").collect()
+          }
+          withTable("t") {
+            sql("CREATE TABLE t (id INT) USING paimon")
+            intercept[Exception] {
+              sql(s"SELECT $maxPt('t')").collect()
+            }
+          }
+
+          withTable("t") {
+            sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 
STRING)")
+            intercept[Exception] {
+              sql(s"SELECT $maxPt('t')").collect()
+            }
+            sql("INSERT INTO t PARTITION (p1='a') VALUES (1)")
+            sql("INSERT INTO t PARTITION (p1='b') VALUES (2)")
+            sql("INSERT INTO t PARTITION (p1='aa') VALUES (3)")
+            sql("ALTER TABLE t ADD PARTITION (p1='z')")
+            checkAnswer(sql(s"SELECT $maxPt('t')"), Row("b"))
+            checkAnswer(sql(s"SELECT id FROM t WHERE p1 = 
$maxPt('default.t')"), Row(2))
+          }
+
+          withTable("t") {
+            sql("CREATE TABLE t (id INT) USING paimon PARTITIONED BY (p1 INT, 
p2 STRING)")
+            intercept[Exception] {
+              sql(s"SELECT $maxPt('t')").collect()
+            }
+            sql("INSERT INTO t PARTITION (p1=1, p2='c') VALUES (1)")
+            sql("INSERT INTO t PARTITION (p1=2, p2='a') VALUES (2)")
+            sql("INSERT INTO t PARTITION (p1=2, p2='b') VALUES (3)")
+            sql("ALTER TABLE t ADD PARTITION (p1='9', p2='z')")
+            checkAnswer(sql(s"SELECT $maxPt('t')"), Row("2"))
+            checkAnswer(
+              sql(s"SELECT id FROM t WHERE p1 = $maxPt('default.t')"),
+              Row(2) :: Row(3) :: Nil)
+          }
+        }
+    }
+  }
 }
 
 private class MyIntSum extends UserDefinedAggregateFunction {

Reply via email to