gengliangwang commented on code in PR #55949:
URL: https://github.com/apache/spark/pull/55949#discussion_r3262406447


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -59,19 +59,31 @@ trait UnresolvedUnaryNode extends UnaryNode with 
UnresolvedNode
 /**
  * A logical plan placeholder that holds the identifier clause string 
expression. It will be
  * replaced by the actual logical plan with the evaluated identifier string.
+ *
+ * Extends `NamedRelation` so it can occupy a `NamedRelation`-typed slot (e.g.
+ * `OverwriteByExpression.table`) directly at parse time, instead of wrapping 
the whole command.
+ *
+ * The parser always places this node inside the command's identifier slot (a 
child slot for
+ * DELETE/UPDATE/MERGE/CTAS/RTAS, or a non-child slot for 
`InsertIntoStatement.table` and
+ * `OverwriteByExpression.table` -- handled via explicit cases in 
`ResolveIdentifierClause` and
+ * `BindParameters`). It is never the substitution root of a `WITH ... 
<command>` subtree, so
+ * `CTEInChildren` semantics are not needed: any surrounding `WithCTE` 
produced by
+ * `CTESubstitution` targets the inner command directly.
  */
 case class PlanWithUnresolvedIdentifier(
     identifierExpr: Expression,
     children: Seq[LogicalPlan],
     planBuilder: (Seq[String], Seq[LogicalPlan]) => LogicalPlan)
-  extends UnresolvedNode {
+  extends UnresolvedNode with NamedRelation {
 
   def this(identifierExpr: Expression, planBuilder: Seq[String] => 
LogicalPlan) = {
     this(identifierExpr, Nil, (ident, _) => planBuilder(ident))
   }
 
   final override val nodePatterns: Seq[TreePattern] = 
Seq(PLAN_WITH_UNRESOLVED_IDENTIFIER)
 
+  override def name: String = identifierExpr.sql

Review Comment:
   `PlanWithUnresolvedIdentifier` is now reachable as a `NamedRelation` via two 
error paths I traced: `SparkStrategies.extractTableNameForError` 
(`sql/core/.../SparkStrategies.scala:1135`) and the `r: NamedRelation => 
toSQLId(r.name)` fallback in `QueryCompilationErrors.scala:3635`. Both will 
render the SQL text of the unresolved expression (e.g. `IDENTIFIER(:p)` or 
`concat('a', 'b')`) as the "table name" in error messages. That's a reasonable 
fallback, but worth a one-line comment on this override noting that 
`identifierExpr.sql` is intentionally an unresolved-placeholder stand-in for 
those error paths.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala:
##########
@@ -70,6 +71,37 @@ class ResolveIdentifierClause(earlyBatches: 
Seq[RuleExecutor[LogicalPlan]#Batch]
 
         executor.execute(p.planBuilder.apply(
           IdentifierResolution.evalIdentifierExpr(p.identifierExpr), 
p.children))
+      // `InsertIntoStatement.table` and `V2WriteCommand.table` are non-child 
LogicalPlan slots
+      // (`child = query`), so the standard `resolveOperatorsUp` traversal 
never visits
+      // placeholders inside them. Materialize them explicitly. Only 
`InsertIntoStatement` and
+      // `OverwriteByExpression` carry a parse-time placeholder today, but 
matching the
+      // `V2WriteCommand` trait keeps the rule consistent across the family.
+      case i: InsertIntoStatement if 
i.table.isInstanceOf[PlanWithUnresolvedIdentifier] && {
+        val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+        p.identifierExpr.resolved && p.childrenResolved
+      } =>
+        val p = i.table.asInstanceOf[PlanWithUnresolvedIdentifier]
+        if (referredTempVars.isDefined) {
+          referredTempVars.get ++= collectTemporaryVariablesInLogicalPlan(p)
+        }
+        i.copy(table = executor.execute(p.planBuilder.apply(
+          IdentifierResolution.evalIdentifierExpr(p.identifierExpr), 
p.children)))

Review Comment:
   The PR description shows this as `case i @ InsertIntoStatement(p: 
PlanWithUnresolvedIdentifier, ...) ...`, but the code uses `isInstanceOf` in 
the guard and `asInstanceOf` in both the guard and the body (the cast appears 
twice). Either update the description, or extract the cast once — e.g. guard 
with `i.table.isInstanceOf[PlanWithUnresolvedIdentifier]`, then do the 
`identifierExpr.resolved && childrenResolved` check and the materialization in 
the body, returning `i` unchanged when not yet resolved. The same applies to 
the `V2WriteCommand` case below.



##########
sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala:
##########
@@ -2460,4 +2462,175 @@ class ParametersSuite extends SharedSparkSession {
       spark.sql("SELECT 1", Array.empty[Any]),
       Row(1))
   }
+
+  // SPARK-46625: WITH ... <write-with-IDENTIFIER> SELECT ... FROM cte
+  // The placeholder is pushed into the command's identifier slot at parse 
time, so
+  // `CTESubstitution` sees the `CTEInChildren` directly and never produces 
the invalid
+  // `WithCTE(InsertIntoStatement, ...)` / `WithCTE(CreateTableAsSelect, ...)` 
shape.
+  private def assertNoWithCTEAroundCTEInChildren(df: DataFrame): Unit = {
+    df.queryExecution.analyzed.foreach {
+      case WithCTE(_: CTEInChildren, _) =>
+        fail(s"Found invalid WithCTE(CTEInChildren, _) 
shape:\n${df.queryExecution.analyzed}")
+      case _ =>
+    }
+  }
+
+  test("SPARK-46625: WITH ... INSERT OVERWRITE TABLE IDENTIFIER(:p) SELECT ... 
FROM cte") {
+    withTable("t_cte_overwrite") {
+      sql("CREATE TABLE t_cte_overwrite (a INT) USING PARQUET")
+      sql("INSERT INTO t_cte_overwrite VALUES (10)")
+      val df = spark.sql(
+        """WITH transformation AS (SELECT 1 AS a)
+          |INSERT OVERWRITE TABLE IDENTIFIER(:tname)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_overwrite"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_overwrite"), Row(1))
+    }
+  }
+
+  test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) SELECT ... FROM cte") 
{
+    withTable("t_cte_into") {
+      sql("CREATE TABLE t_cte_into (a INT) USING PARQUET")
+      val df = spark.sql(
+        """WITH transformation AS (SELECT 7 AS a)
+          |INSERT INTO IDENTIFIER(:tname)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_into"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_into"), Row(7))
+    }
+  }
+
+  test("SPARK-46625: CREATE TABLE IDENTIFIER(:p) AS WITH ... SELECT ... FROM 
cte") {
+    withTable("t_cte_ctas") {
+      val df = spark.sql(
+        """CREATE TABLE IDENTIFIER(:tname) USING PARQUET AS
+          |WITH transformation AS (SELECT 3 AS a)
+          |SELECT * FROM transformation""".stripMargin,
+        Map("tname" -> "t_cte_ctas"))
+      assertNoWithCTEAroundCTEInChildren(df)
+      checkAnswer(spark.table("t_cte_ctas"), Row(3))
+    }
+  }
+
+  // SPARK-46625: legacy parameter-substitution mode triggers the 
parameters.scala traversal
+  // path. The placeholder lives in `InsertIntoStatement.table`, which is 
*not* a child, so this
+  // exercises the `InsertIntoStatement` special-case in `BindParameters.bind` 
that recurses into
+  // the `table` slot, and the `getDefaultTreePatternBits` override on 
`InsertIntoStatement` that
+  // exposes `table`'s tree-pattern bits for pruning.
+  test("SPARK-46625: INSERT IDENTIFIER(:p) under legacy parameter 
substitution") {
+    withSQLConf(SQLConf.LEGACY_PARAMETER_SUBSTITUTION_CONSTANTS_ONLY.key -> 
"true") {
+      withTable("t_legacy_param") {
+        sql("CREATE TABLE t_legacy_param (a INT) USING PARQUET")
+        spark.sql(
+          """WITH transformation AS (SELECT 11 AS a)
+            |INSERT INTO IDENTIFIER(:tname)
+            |SELECT * FROM transformation""".stripMargin,
+          Map("tname" -> "t_legacy_param"))
+        checkAnswer(spark.table("t_legacy_param"), Row(11))
+      }
+    }
+  }
+
+  // SPARK-46625: INSERT INTO REPLACE WHERE goes through 
`OverwriteByExpression`, whose `table`
+  // slot is typed `NamedRelation`. `PlanWithUnresolvedIdentifier` extends 
`NamedRelation` so the
+  // placeholder sits in the slot directly. Verify on the parsed plan that the 
placeholder lives
+  // in `OverwriteByExpression.table` rather than wrapping the whole command 
-- running the
+  // analyzer fully would require a v2 catalog.
+  test("SPARK-46625: WITH ... INSERT INTO IDENTIFIER(:p) REPLACE WHERE ... 
parser") {
+    // Use a non-literal-string expression so `withIdentClause` produces
+    // `PlanWithUnresolvedIdentifier` rather than short-circuiting to 
`UnresolvedRelation`.
+    val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+      """WITH transformation AS (SELECT 99 AS a)
+        |INSERT INTO IDENTIFIER('some' || '_table') REPLACE WHERE a = 10
+        |SELECT * FROM transformation""".stripMargin)
+    val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => 
o }.getOrElse(
+      fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+    assert(overwrite.table.isInstanceOf[PlanWithUnresolvedIdentifier],
+      s"Expected OverwriteByExpression.table to be 
PlanWithUnresolvedIdentifier, " +
+        s"got ${overwrite.table.getClass.getSimpleName}:\n$parsedPlan")
+    // After CTESubstitution runs, the CTE defs should land on the command's 
children (because
+    // OverwriteByExpression is a CTEInChildren) -- never as 
`WithCTE(OverwriteByExpression, _)`.
+    val substituted = CTESubstitution.apply(parsedPlan)
+    substituted.foreach {
+      case WithCTE(_: CTEInChildren, _) =>
+        fail(s"Found invalid WithCTE(CTEInChildren, _) shape after 
CTESubstitution:\n$substituted")
+      case _ =>
+    }
+  }
+
+  // SPARK-46625: Parameter inside `IDENTIFIER(:p)` on REPLACE WHERE lives in
+  // `OverwriteByExpression.table`, which is a non-child slot. Verify that
+  // `BindParameters.bind` reaches into the slot via the explicit 
`OverwriteByExpression`
+  // recursion (parameters.scala) and that the `getDefaultTreePatternBits` 
override on
+  // `OverwriteByExpression` exposes the PARAMETER bit for pruning. Done at 
the rule level
+  // because driving REPLACE WHERE through full analysis would require a v2 
catalog.
+  test("SPARK-46625: BindParameters recurses into 
OverwriteByExpression.table") {
+    val parsedPlan = spark.sessionState.sqlParser.parsePlan(
+      """INSERT INTO IDENTIFIER(:tname) REPLACE WHERE a = 10
+        |SELECT 1 AS a""".stripMargin)
+    val overwrite = parsedPlan.collectFirst { case o: OverwriteByExpression => 
o }.getOrElse(
+      fail(s"Expected OverwriteByExpression in parsed plan:\n$parsedPlan"))
+    // Pruning prerequisite: the PARAMETER bit must be visible at the 
OverwriteByExpression
+    // level (it lives inside `table`, which is not a child); this exercises 
the
+    // `getDefaultTreePatternBits` override.
+    assert(overwrite.containsPattern(PARAMETER),
+      "OverwriteByExpression.getDefaultTreePatternBits must propagate 
`table`'s PARAMETER bit")
+
+    val bound = BindParameters.apply(
+      NameParameterizedQuery(parsedPlan, Seq("tname"), 
Seq(Literal("foo_table"))))
+    val boundOverwrite = bound.collectFirst { case o: OverwriteByExpression => 
o }.getOrElse(
+      fail(s"Expected OverwriteByExpression in bound plan:\n$bound"))
+    assert(!boundOverwrite.table.containsPattern(PARAMETER),
+      s"Expected :tname inside OverwriteByExpression.table to be bound, 
got:\n$boundOverwrite")
+  }
+
+  // SPARK-46625: `CacheTableAsSelect.tempViewName` is an `Expression` slot, 
so an
+  // `IDENTIFIER(<non-literal>)` produces an 
`ExpressionWithUnresolvedIdentifier` there instead of
+  // wrapping the entire command in a `PlanWithUnresolvedIdentifier`. Verify 
on the parsed plan
+  // that the name slot holds the expression placeholder and no 
`WithCTE(CTEInChildren, _)` shape
+  // survives `CTESubstitution` (running the cache through full analysis would 
require the temp
+  // view machinery, so this is a parser-level test).
+  test("SPARK-46625: CACHE TABLE IDENTIFIER(...) AS WITH ... SELECT ... 
parser") {

Review Comment:
   End-to-end coverage gap: the CACHE TABLE / REPLACE TABLE / `INSERT INTO 
IDENTIFIER REPLACE WHERE` tests are all parser-level. For CACHE TABLE 
specifically an e2e test doesn't need v2-catalog or temp-view machinery beyond 
what `SharedSparkSession` already provides — `CACHE TABLE IDENTIFIER('temp_v') 
AS WITH ... SELECT ...` should resolve and execute. That would exercise the 
`tempViewNameString` extraction in `DataSourceV2Strategy:781` and the new 
`CheckAnalysis` invariant case for `CacheTableAsSelect`, neither of which any 
test currently reaches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to