andygrove opened a new issue, #1907: URL: https://github.com/apache/datafusion-ballista/issues/1907
## Describe the bug After upgrading to DataFusion 54 (PR #1906), distributed TPC-H execution is dramatically slower. CI's TPC-H SF10 job went from ~12 min on `main` to ~52 min and then failed with `No space left on device`. Example per-query (CI, SF10, `prefer_hash_join=false`): | query | main (DF53) | DF54 branch | |---|---:|---:| | q1 | 2.95 s | 40 s | | q2 | 3.38 s | 600 s | | q3 | 5.48 s | 143 s | | q5 | 15.8 s | died (disk full) | This reproduces locally and is well-characterized below. Single-node DataFusion 54 is fast, so the regression is in Ballista's distributed execution path. ## To Reproduce SF10 parquet, 1 scheduler + 1 executor (`-c 8`), release build: ``` tpch benchmark ballista --query 1 --partitions 16 --iterations 1 -c datafusion.optimizer.prefer_hash_join=false ``` ## The core finding: tasks queued behind the first wave run ~8× slower With an executor of N task slots, the **first N tasks (wave 1) run fast; every subsequent wave runs ~8× slower for identical work.** | executor slots | partitions | waves | q1 time | |---:|---:|---:|---:| | 8 | 8 | 1 | 0.50 s | | 8 | 16 | 2 | 4.2 s | | 8 | 24 | 3 | 8.0 s | | 16 | 16 | 1 | 0.30 s | | 16 | 32 | 2 | 4.0 s | - Single-node DataFusion 54 (same data, 16 target partitions): **0.21 s**. - DataFusion 53 (same Ballista commit), distributed, q1 @16part/8slots: **0.56 s** (no wave penalty). - It resets per query and is purely timing/wave-dependent, not partition-index-dependent (with 16 slots, partitions 8–15 run in wave 1 and are fast). Per-task timing (q1 @16part/8slots): wave-1 tasks ~0.45 s each; wave-2 tasks start immediately after wave 1 (no dispatch gap) but take ~3.6 s each of **compute** time. ## What the slow tasks are doing A CPU sample during the slow wave shows all 8 worker threads **busy** (not parked) in: `execute_query_stage → GroupedHashAggregateStream → ProjectionStream → FilterExecStream → CooperativeStream → FileStream → ParquetRecordBatchReader → snappy/RLE decode`. So later-wave tasks are fully parallel and CPU-bound, but the **same parquet decode is ~8× slower** than in wave 1 for the same rows/task. The merged scan stage metrics also report very low `scan_efficiency_ratio` (~6%) and inflated `output_rows`/`files_opened` that scale with partition count (may be partly a metric-merge artifact, but the wall-clock slowdown is real). ## Ruled out All tested with config propagation working (after merging the fix for client `datafusion.*` overrides): - Stripping `CooperativeExec` from the executed plan — no effect. - `tokio` version — unchanged (1.52.3) between DF53 and DF54 branches. - Executor worker-thread priority (the `DedicatedExecutor` low priority) — no effect. - Worker thread count (×8) — no effect. - Sort-based vs hash-based shuffle — identical. - `datafusion.optimizer.repartition_file_scans=false` — no effect (didn't change the plan). - `datafusion.optimizer.repartition_file_min_size` huge (whole-file groups, no byte-range splitting) — made it **much worse** (51 s), so byte-range splitting is helping, not hurting. - `datafusion.execution.parquet.pushdown_filters=true` — no effect. - Sharing one `RuntimeEnv` across tasks (so the DF54 `FileMetadataCache` persists instead of being fresh per task) — no effect. Caveat: a test bypassing the `DedicatedExecutor` was inadvertently applied to the pull/poll path while the cluster ran PushStaged, so "runtime bypass" is still inconclusive. ## Question / context Has anyone seen DataFusion 54 parquet/scan behavior where tasks run after an initial batch decode the same data significantly slower on a shared process? (We hit DF54 parquet footer/metadata-caching changes when upgrading Comet.) Any pointers on what changed in 54's parquet scan or runtime that could cause later, fully-parallel, CPU-bound scan tasks to be ~8× slower would help. Pinning this likely needs a DF53↔54 scan-level comparison or CPU-counter profiling. Blocks #1906. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
