andygrove opened a new issue, #1909: URL: https://github.com/apache/datafusion-ballista/issues/1909
## Describe the bug On DataFusion 54, TPC-H queries with an **uncorrelated scalar subquery** fail in distributed execution: **q11, q15, q22**. The executor cannot deserialize the per-stage physical plan it receives: ``` DataFusion error: Internal error: ScalarSubqueryExpr can only be deserialized as part of a surrounding ScalarSubqueryExec. ``` (Correlated / `IN` subqueries that decorrelate to joins — e.g. q2, q17, q20 — are unaffected.) These queries worked on DataFusion 53, so this is a DF54 regression for Ballista. ## To Reproduce SF10 parquet, 1 scheduler + 1 executor (`-c 8`), release build: ``` tpch benchmark ballista --query 15 --partitions 16 -c datafusion.optimizer.prefer_hash_join=false ``` The query never completes (see #1908 — the scheduler marks the executor dead on the launch failure). ## Root cause DataFusion 54 plans an uncorrelated scalar subquery as a physical `ScalarSubqueryExec` that wraps a `ScalarSubqueryExpr`. In `datafusion-proto`, a `ScalarSubqueryExpr` can only be **deserialized inside its surrounding `ScalarSubqueryExec`**, which seeds the per-scope results handle in the decode context: - `datafusion-proto` `physical_plan/from_proto.rs` (`ExprType::ScalarSubquery`) requires `ctx.scalar_subquery_results()` to be present, else it returns the error above. - That handle is only set while decoding the `ScalarSubqueryExec` node (`PhysicalPlanDecodeContext::with_scalar_subquery_results`). A **whole** plan round-trips fine. But Ballista splits the physical plan into stages at shuffle boundaries, and the scalar subquery's input contains a shuffle (e.g. an aggregate), so the `ScalarSubqueryExec` and the operator that consumes its value land in **different stages**. The consuming stage is then serialized with a *bare* `ScalarSubqueryExpr`, and the executor decodes it without the surrounding `ScalarSubqueryExec` → failure. ## Possible directions - Evaluate the uncorrelated scalar subquery as its own job/stage and inline its single value as a literal into the main plan before stage splitting (closest to how a distributed engine usually handles a broadcast scalar). - Or keep the `ScalarSubqueryExec` and all of its consumers within a single stage so the proto round-trip stays whole. - Or thread the scalar-subquery results handle through Ballista's per-stage decode context so a split stage can reconstruct the expr. Input from maintainers on the preferred approach would be welcome; this likely needs design discussion (and possibly an upstream `datafusion-proto` change). ## Impact q11, q15, q22 cannot run in distributed mode on DataFusion 54. Blocks #1906. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
