This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9364bcbf24 [spark] Fix v1 function with CTE (#6215)
9364bcbf24 is described below

commit 9364bcbf24595feef55d75d5e4952ef727c004df
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 9 13:31:52 2025 +0800

    [spark] Fix v1 function with CTE (#6215)
---
 .../extensions/RewritePaimonFunctionCommands.scala | 17 ++++++---
 .../spark/sql/PaimonOptimizationTestBase.scala     |  9 ++---
 .../spark/sql/PaimonV1FunctionTestBase.scala       | 40 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 8 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index f295e3eef0..bad7fe3c62 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedException, 
UnresolvedFunction, UnresolvedFunctionName, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogFunction
 import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction, 
DescribeFunction, DropFunction, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction, 
DescribeFunction, DropFunction, LogicalPlan, SubqueryAlias, UnresolvedWith}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, 
UNRESOLVED_FUNCTION}
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
LookupCatalog}
@@ -49,7 +49,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
       return plan
     }
 
-    plan.resolveOperatorsUp {
+    val applied = plan.resolveOperatorsUp {
       case CreateFunction(
             CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function, 
funcIdent),
             className,
@@ -82,9 +82,18 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
         } else {
           d
         }
+    }
 
-      // Needs to be done here and transform to `UnResolvedPaimonV1Function`, 
so that spark's Analyzer can resolve
-      // the 'arguments' without throwing an exception, saying that function 
is not supported.
+    // Needs to be done here and transform to `UnResolvedPaimonV1Function`, so 
that spark's Analyzer can resolve
+    // the 'arguments' without throwing an exception, saying that function is 
not supported.
+    transformPaimonV1Function(applied)
+  }
+
+  private def transformPaimonV1Function(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperatorsUp {
+      case u: UnresolvedWith =>
+        u.copy(cteRelations = u.cteRelations.map(
+          t => (t._1, 
transformPaimonV1Function(t._2).asInstanceOf[SubqueryAlias])))
       case l: LogicalPlan =>
         
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
           case u: UnresolvedFunction =>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 925344d49d..e55a610a02 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -59,6 +59,7 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
                                |  (SELECT AVG(b) AS avg_b FROM T)
                                |""".stripMargin)
       val optimizedPlan = Optimize.execute(query.queryExecution.analyzed)
+      val id = optimizedPlan.asInstanceOf[WithCTE].cteDefs.head.id.toInt
 
       val df = PaimonUtils.createDataset(spark, createRelationV2("T"))
       val mergedSubquery = df
@@ -82,11 +83,11 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
       val correctAnswer = WithCTE(
         OneRowRelation()
           .select(
-            extractorExpression(0, analyzedMergedSubquery.output, 0),
-            extractorExpression(0, analyzedMergedSubquery.output, 1),
-            extractorExpression(0, analyzedMergedSubquery.output, 2)
+            extractorExpression(id, analyzedMergedSubquery.output, 0),
+            extractorExpression(id, analyzedMergedSubquery.output, 1),
+            extractorExpression(id, analyzedMergedSubquery.output, 2)
           ),
-        Seq(definitionNode(analyzedMergedSubquery, 0))
+        Seq(definitionNode(analyzedMergedSubquery, id))
       )
       // Check the plan applied MergePaimonScalarSubqueries.
       comparePlans(optimizedPlan.analyze, correctAnswer.analyze)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 567ae910b6..61fe87758d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -174,6 +174,46 @@ abstract class PaimonV1FunctionTestBase extends 
PaimonSparkTestWithRestCatalogBa
       }
     }
   }
+
+  test("Paimon V1 Function: select with CTE and subquery") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT)")
+        sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+
+        checkAnswer(
+          sql("""
+                |WITH tmp_view AS (
+                |  SELECT udf_add2(a, b) AS c1 FROM t
+                |)
+                |SELECT * FROM tmp_view
+                |""".stripMargin),
+          Seq(Row(3), Row(7))
+        )
+
+        checkAnswer(
+          sql("""
+                |WITH tmp_view AS (
+                |  SELECT udf_add2(1, 2)
+                |)
+                |SELECT * FROM tmp_view
+                |""".stripMargin),
+          Seq(Row(3))
+        )
+
+        checkAnswer(
+          sql("""
+                |SELECT * FROM (SELECT udf_add2(a, b) AS c1 FROM t)
+                |""".stripMargin),
+          Seq(Row(3), Row(7))
+        )
+      }
+    }
+  }
 }
 
 class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {

Reply via email to