DaxterXS commented on issue #14585:
URL: https://github.com/apache/iceberg/issues/14585#issuecomment-4343537406

   ## Root cause analysis — with source code references
   
   I hit the same issue on **AWS Glue 5.1** (Spark 3.5.6-amzn-1, Iceberg 
bundled in `aws-glue-di-package-5.1.854.jar`) with a `MERGE INTO` on an Iceberg 
table whose USING subquery contains `CURRENT_TIMESTAMP()` and 
`input_file_name()`. The exact same query works on **Glue 4.0** (Spark 3.3.0).
   
   I traced the full chain through the Spark and Iceberg source code. Here's 
what's happening:
   
   ---
   
   ### 1. Spark 3.5 introduced `groupFilterCondition` on `ReplaceData`
   
   In Spark 3.3, Iceberg had its own 
[`RewriteMergeIntoTable`](https://github.com/apache/iceberg/blob/apache-iceberg-1.3.1/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala)
 that produced `ReplaceIcebergData(writeRelation, mergeRows, relation)` — **no 
subquery wrapping the source plan**.
   
   In Spark 3.5, this logic moved into Spark core's own 
[`RewriteMergeIntoTable`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala).
 The new `buildReplaceDataPlan` method now calls 
[`toGroupFilterCondition`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala)
 which wraps the **entire source plan** in an `Exists` subquery:
   
   ```scala
   private def toGroupFilterCondition(
       relation: DataSourceV2Relation,
       source: LogicalPlan,
       cond: Expression): Expression = {
     val condWithOuterRefs = cond transformUp {
       case attr: Attribute if relation.outputSet.contains(attr) => 
OuterReference(attr)
       case other => other
     }
     val outerRefs = condWithOuterRefs.collect { case OuterReference(e) => e }
     Exists(Filter(condWithOuterRefs, source), outerRefs)
   }
   ```
   
   This `Exists` is stored directly on the 
[`ReplaceData`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala)
 node as `groupFilterCondition`:
   
   ```scala
   // Spark 3.5 — new field not present in 3.3
   case class ReplaceData(
       table, condition, query, originalTable,
       groupFilterCondition: Option[Expression] = None,  // <-- wraps source in 
Exists
       write)
   ```
   
   ### 2. `Exists` propagates non-determinism from the source plan
   
   
[`PlanExpression`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala)
 (parent of `Exists`) computes:
   
   ```scala
   override lazy val deterministic: Boolean = children.forall(_.deterministic) 
&& plan.deterministic
   ```
   
   If the source plan contains **any** non-deterministic expression 
(`current_timestamp()`, `uuid()`, `input_file_name()`, `now()`, etc.), the 
entire `Exists` becomes non-deterministic.
   
   ### 3. `CheckAnalysis` rejects `ReplaceData` with non-deterministic 
expressions
   
   
[`CheckAnalysis`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala)
 has a catch-all rule:
   
   ```scala
   case o if o.expressions.exists(!_.deterministic) &&
     !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
     !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
     !o.isInstanceOf[Expand] && !o.isInstanceOf[Generate] &&
     !o.isInstanceOf[LateralJoin] =>
     o.failAnalysis(errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", ...)
   ```
   
   `ReplaceData` is none of the allowed types → 
**`INVALID_NON_DETERMINISTIC_EXPRESSIONS`**.
   
   ---
   
   ### 4. The fix exists in Spark but is not applied to `ReplaceData`
   
   [SPARK-48871](https://issues.apache.org/jira/browse/SPARK-48871) (fixed in 
Spark 3.5.2 via [PR #47304](https://github.com/apache/spark/pull/47304)) 
introduced the 
[`SupportsNonDeterministicExpression`](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala)
 trait. The updated `CheckAnalysis` in [Spark 
3.5.6](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala)
 now respects it:
   
   ```scala
   private def operatorAllowsNonDeterministicExpressions(plan: LogicalPlan): 
Boolean = {
     plan match {
       case p: SupportsNonDeterministicExpression => 
p.allowNonDeterministicExpression
       case _ => false
     }
   }
   ```
   
   **However, `ReplaceData` does NOT implement this trait** — verified in 
[Spark 3.5.6 
`v2Commands.scala`](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala).
 So the escape hatch exists but nobody applied it.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to