arthurli-dotcom opened a new issue, #55575:
URL: https://github.com/apache/spark/issues/55575

   ### Describe the bug
   
   `PushPredicateThroughNonJoin` crashes with `AssertionError` when a filter 
references a passthrough column (not aliased) on a CTE/subquery that reads from 
a Hive view backed by a UNION ALL.
   
   ### Reproduction
   
   Given a Hive view `my_view` defined as `SELECT * FROM t1 UNION ALL SELECT * 
FROM t2 UNION ALL SELECT * FROM t3`:
   
   ```sql
   -- Crashes
   WITH cte AS (
     SELECT id, name, tags['eventType'] AS eventtype
     FROM my_view
     WHERE ds = '2024-01-01'
   )
   SELECT * FROM cte WHERE name = 'app_launch';
   ```
   
   ```sql
   -- Works (filter directly on the view, no CTE/Project in between)
   SELECT * FROM my_view WHERE name = 'app_launch' AND ds = '2024-01-01';
   ```
   
   ```sql
   -- Works (filter inside the CTE, below the Project, directly above Union)
   WITH cte AS (
     SELECT id, name, tags['eventType'] AS eventtype
     FROM my_view
     WHERE ds = '2024-01-01'
       AND name = 'app_launch'
   )
   SELECT * FROM cte;
   ```
   
   ### Root Cause
   
   The rule applies in two iterations via `plan transform applyLocally`:
   
   **Iteration 1 — Filter + Project case** (`Optimizer.scala` ~line 1724):
   
   ```scala
   case Filter(condition, project @ Project(fields, grandChild)) =>
     val aliasMap = getAliasMap(project)
     project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))
   ```
   
   Pushes `name = 'app_launch'` below the Project. Since `name` is a 
passthrough column (not in `aliasMap`), `replaceAlias` returns the attribute 
with its **original exprId** from the Project output context (`name#X`).
   
   Result: `Project(..., Filter(name#X = 'app_launch', Union))`
   
   **Iteration 2 — Filter + Union case** (~line 1783):
   
   ```scala
   case filter @ Filter(condition, union: Union) =>
     val output = union.output
     val newGrandChildren = union.children.map { grandchild =>
       val newCond = pushDownCond transform {
         case e if output.exists(_.semanticEquals(e)) =>
           grandchild.output(output.indexWhere(_.semanticEquals(e)))
       }
       assert(newCond.references.subsetOf(grandchild.outputSet))  // FAILS
       Filter(newCond, grandchild)
     }
   ```
   
   The Union's `output` has `name#Y` (derived from `firstAttr.exprId` in 
`Union.output`). `semanticEquals` compares exprIds, so `name#X` does not match 
`name#Y`. The attribute is never remapped, and the assertion fails.
   
   The exprId mismatch occurs because when a Hive view is expanded, the view 
resolution layer assigns new exprIds to the Union's output attributes that 
differ from the exprIds in the Project's output references.
   
   ### Workaround
   
   Move the filter into the CTE/subquery so it sits directly above the Union, 
skipping the Filter → Project → Union code path.
   
   ### Stack Trace
   
   ```
   java.lang.AssertionError: assertion failed
     at scala.Predef$.assert(Predef.scala:208)
     at 
o.a.s.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$7.$anonfun$applyOrElse$55(Optimizer.scala:2040)
     at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
     at 
o.a.s.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$7.applyOrElse(Optimizer.scala:2035)
     at 
o.a.s.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$7.applyOrElse(Optimizer.scala:1962)
     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
     at 
o.a.s.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:521)
     at 
o.a.s.sql.catalyst.optimizer.PushDownPredicates$.apply(Optimizer.scala:1948)
   ```
   
   ### Environment
   
   - Spark version: 3.5.5 (AMZ fork `3.5.5-amzn-1`)
   - Java version: JDK 17
   - Scala version: 2.12.18
   - Running on EMR with Hive metastore and Iceberg tables behind the UNION view


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to