This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9364bcbf24 [spark] Fix v1 function with CTE (#6215)
9364bcbf24 is described below
commit 9364bcbf24595feef55d75d5e4952ef727c004df
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 9 13:31:52 2025 +0800
[spark] Fix v1 function with CTE (#6215)
---
.../extensions/RewritePaimonFunctionCommands.scala | 17 ++++++---
.../spark/sql/PaimonOptimizationTestBase.scala | 9 ++---
.../spark/sql/PaimonV1FunctionTestBase.scala | 40 ++++++++++++++++++++++
3 files changed, 58 insertions(+), 8 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
index f295e3eef0..bad7fe3c62 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewritePaimonFunctionCommands.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedException,
UnresolvedFunction, UnresolvedFunctionName, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogFunction
import org.apache.spark.sql.catalyst.expressions.{Expression, Unevaluable}
-import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction,
DescribeFunction, DropFunction, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateFunction,
DescribeFunction, DropFunction, LogicalPlan, SubqueryAlias, UnresolvedWith}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern,
UNRESOLVED_FUNCTION}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
LookupCatalog}
@@ -49,7 +49,7 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
return plan
}
- plan.resolveOperatorsUp {
+ val applied = plan.resolveOperatorsUp {
case CreateFunction(
CatalogAndFunctionIdentifier(v1FunctionCatalog: SupportV1Function,
funcIdent),
className,
@@ -82,9 +82,18 @@ case class RewritePaimonFunctionCommands(spark: SparkSession)
} else {
d
}
+ }
- // Needs to be done here and transform to `UnResolvedPaimonV1Function`,
so that spark's Analyzer can resolve
- // the 'arguments' without throwing an exception, saying that function
is not supported.
+ // Needs to be done here and transform to `UnResolvedPaimonV1Function`, so
that spark's Analyzer can resolve
+ // the 'arguments' without throwing an exception, saying that function is
not supported.
+ transformPaimonV1Function(applied)
+ }
+
+ private def transformPaimonV1Function(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperatorsUp {
+ case u: UnresolvedWith =>
+ u.copy(cteRelations = u.cteRelations.map(
+ t => (t._1,
transformPaimonV1Function(t._2).asInstanceOf[SubqueryAlias])))
case l: LogicalPlan =>
l.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_FUNCTION)) {
case u: UnresolvedFunction =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index 925344d49d..e55a610a02 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -59,6 +59,7 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
| (SELECT AVG(b) AS avg_b FROM T)
|""".stripMargin)
val optimizedPlan = Optimize.execute(query.queryExecution.analyzed)
+ val id = optimizedPlan.asInstanceOf[WithCTE].cteDefs.head.id.toInt
val df = PaimonUtils.createDataset(spark, createRelationV2("T"))
val mergedSubquery = df
@@ -82,11 +83,11 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
val correctAnswer = WithCTE(
OneRowRelation()
.select(
- extractorExpression(0, analyzedMergedSubquery.output, 0),
- extractorExpression(0, analyzedMergedSubquery.output, 1),
- extractorExpression(0, analyzedMergedSubquery.output, 2)
+ extractorExpression(id, analyzedMergedSubquery.output, 0),
+ extractorExpression(id, analyzedMergedSubquery.output, 1),
+ extractorExpression(id, analyzedMergedSubquery.output, 2)
),
- Seq(definitionNode(analyzedMergedSubquery, 0))
+ Seq(definitionNode(analyzedMergedSubquery, id))
)
// Check the plan applied MergePaimonScalarSubqueries.
comparePlans(optimizedPlan.analyze, correctAnswer.analyze)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 567ae910b6..61fe87758d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -174,6 +174,46 @@ abstract class PaimonV1FunctionTestBase extends
PaimonSparkTestWithRestCatalogBa
}
}
}
+
+ test("Paimon V1 Function: select with CTE and subquery") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT)")
+ sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+
+ checkAnswer(
+ sql("""
+ |WITH tmp_view AS (
+ | SELECT udf_add2(a, b) AS c1 FROM t
+ |)
+ |SELECT * FROM tmp_view
+ |""".stripMargin),
+ Seq(Row(3), Row(7))
+ )
+
+ checkAnswer(
+ sql("""
+ |WITH tmp_view AS (
+ | SELECT udf_add2(1, 2)
+ |)
+ |SELECT * FROM tmp_view
+ |""".stripMargin),
+ Seq(Row(3))
+ )
+
+ checkAnswer(
+ sql("""
+ |SELECT * FROM (SELECT udf_add2(a, b) AS c1 FROM t)
+ |""".stripMargin),
+ Seq(Row(3), Row(7))
+ )
+ }
+ }
+ }
}
class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {