andygrove commented on PR #1900:
URL: 
https://github.com/apache/datafusion-ballista/pull/1900#issuecomment-4847651420

   The new AQE-on TPC-H CI job was hanging (not the AQE-off job). Root-caused 
and fixed in 382b8e1e.
   
   ## Symptom
   
   Under the adaptive (AQE) planner, the job completed the AQE-off suite, then 
hung on the first multi-join query (Q2) and never recovered. It reproduces 
locally on a 1-executor / 4-slot / 16-partition cluster and is independent of 
slot/partition counts (hangs at `-c 4`/4, `-c 8`/8, `-c 16`/16).
   
   ## Root cause: DataFusion 54 dynamic filter pushdown
   
   DataFusion 54 populates dynamic filters at runtime from an upstream operator 
(a hash-join build side, a TopK heap, a partial aggregate) and reads them in a 
downstream scan **within the same plan**. Ballista splits a plan into stages at 
shuffle/broadcast boundaries that run as **independent tasks**, so when the 
producing operator and the consuming scan land in different stages, the filter 
is never populated across the boundary and the consuming scan blocks forever.
   
   This deadlocks every multi-join query under AQE, because broadcast 
(`CollectLeft`) hash joins create the cross-stage producer/consumer split (the 
probe-side scan carries `predicate=DynamicFilter [ empty ]`). The static 
planner escapes it only because it plans sort-merge joins, which carry no join 
dynamic filter — which is why the AQE-off suite passed and AQE-on hung.
   
   The regression entered when this branch merged `main`: the DataFusion 54 
upgrade (#1906) and the AQE-on CI (#1913) landed in the same merge, and AQE-on 
had never been exercised in CI before.
   
   ## How it was confirmed
   
   - `main` (same DF54 + same AQE-on CI) passes the TPC-H job in ~24 min; only 
this branch hung → the un-zeroed thresholds (broadcast under AQE) are what 
trigger it.
   - DF53-vs-DF54 bisect: on the pre-merge DF53 commit, Q2/Q5/Q8/Q9 all run 
fine (Q2 0.16 s, 100 rows). On DF54 they all hang.
   - `enable_dynamic_filter_pushdown=false` makes Q2 complete and removes the 
`DynamicFilter` from the plan; the per-type sub-flags (join/topk/aggregate) 
each individually leave the hang.
   
   ## Fix
   
   Pin `datafusion.optimizer.enable_dynamic_filter_pushdown=false` in the 
Ballista session defaults, alongside the other DataFusion options Ballista 
already disables because they rely on in-process state that cannot cross stage 
boundaries (e.g. `enable_physical_uncorrelated_scalar_subquery`). Long-term, 
carrying dynamic filters across stage boundaries is tracked by #1375.
   
   Verified locally: all 21 AQE-on queries (Q16 omitted, as in CI) complete 
with the default config at the CI topology.
   
   Orthogonal note: Q11 and Q15 return 0 rows under **both** AQE on and off at 
SF10 — a pre-existing correctness issue, not introduced here and not caught by 
CI (which doesn't validate row counts). Worth a separate issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to