DaxterXS commented on issue #14585: URL: https://github.com/apache/iceberg/issues/14585#issuecomment-4343537406
## Root cause analysis — with source code references I hit the same issue on **AWS Glue 5.1** (Spark 3.5.6-amzn-1, Iceberg bundled in `aws-glue-di-package-5.1.854.jar`) with a `MERGE INTO` on an Iceberg table whose USING subquery contains `CURRENT_TIMESTAMP()` and `input_file_name()`. The exact same query works on **Glue 4.0** (Spark 3.3.0). I traced the full chain through the Spark and Iceberg source code. Here's what's happening: --- ### 1. Spark 3.5 introduced `groupFilterCondition` on `ReplaceData` In Spark 3.3, Iceberg had its own [`RewriteMergeIntoTable`](https://github.com/apache/iceberg/blob/apache-iceberg-1.3.1/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala) that produced `ReplaceIcebergData(writeRelation, mergeRows, relation)` — **no subquery wrapping the source plan**. In Spark 3.5, this logic moved into Spark core's own [`RewriteMergeIntoTable`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala). The new `buildReplaceDataPlan` method now calls [`toGroupFilterCondition`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala) which wraps the **entire source plan** in an `Exists` subquery: ```scala private def toGroupFilterCondition( relation: DataSourceV2Relation, source: LogicalPlan, cond: Expression): Expression = { val condWithOuterRefs = cond transformUp { case attr: Attribute if relation.outputSet.contains(attr) => OuterReference(attr) case other => other } val outerRefs = condWithOuterRefs.collect { case OuterReference(e) => e } Exists(Filter(condWithOuterRefs, source), outerRefs) } ``` This `Exists` is stored directly on the [`ReplaceData`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala) node as `groupFilterCondition`: ```scala // Spark 3.5 — new field not present in 3.3 case class ReplaceData( table, condition, query, originalTable, groupFilterCondition: Option[Expression] = None, // <-- wraps source in Exists write) ``` ### 2. `Exists` propagates non-determinism from the source plan [`PlanExpression`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala) (parent of `Exists`) computes: ```scala override lazy val deterministic: Boolean = children.forall(_.deterministic) && plan.deterministic ``` If the source plan contains **any** non-deterministic expression (`current_timestamp()`, `uuid()`, `input_file_name()`, `now()`, etc.), the entire `Exists` becomes non-deterministic. ### 3. `CheckAnalysis` rejects `ReplaceData` with non-deterministic expressions [`CheckAnalysis`](https://github.com/apache/spark/blob/v3.5.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala) has a catch-all rule: ```scala case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] && !o.isInstanceOf[Expand] && !o.isInstanceOf[Generate] && !o.isInstanceOf[LateralJoin] => o.failAnalysis(errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", ...) ``` `ReplaceData` is none of the allowed types → **`INVALID_NON_DETERMINISTIC_EXPRESSIONS`**. --- ### 4. The fix exists in Spark but is not applied to `ReplaceData` [SPARK-48871](https://issues.apache.org/jira/browse/SPARK-48871) (fixed in Spark 3.5.2 via [PR #47304](https://github.com/apache/spark/pull/47304)) introduced the [`SupportsNonDeterministicExpression`](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala) trait. The updated `CheckAnalysis` in [Spark 3.5.6](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala) now respects it: ```scala private def operatorAllowsNonDeterministicExpressions(plan: LogicalPlan): Boolean = { plan match { case p: SupportsNonDeterministicExpression => p.allowNonDeterministicExpression case _ => false } } ``` **However, `ReplaceData` does NOT implement this trait** — verified in [Spark 3.5.6 `v2Commands.scala`](https://github.com/apache/spark/blob/v3.5.6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala). So the escape hatch exists but nobody applied it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
