andygrove commented on issue #1907:
URL: 
https://github.com/apache/datafusion-ballista/issues/1907#issuecomment-4821004737

   ## Root cause found — and it's a correctness bug, not (just) a decode 
slowdown
   
   The earlier "same rows/task but ~8× slower decode" reading was a 
misdiagnosis. The real cause is that **each scan task reads the entire table 
instead of its own file group**, so later-wave tasks do ~N× the work *and* 
produce ~N× inflated results. The slowdown is a side effect of the over-read.
   
   ### What actually happens
   
   DataFusion 54 changed `DataSourceExec` to hand file groups to partition 
streams from a **shared work-queue** (`open_with_args` + a per-execution 
`SharedWorkSource`; `create_sibling_state` in `FileScanConfig`). The queue is 
only divided across partitions when **all partitions of one plan instance are 
polled concurrently**.
   
   Ballista executes **one partition per task on its own decoded plan 
instance** (`ShuffleWriterExec`/`SortShuffleWriterExec` calls 
`child.execute(input_partition)` for a single partition). A single partition 
polled in isolation drains the *entire* work-queue → it scans the whole table.
   
   - Wave 1 is dispatched as one `LaunchMultiTask` batch sharing one decoded 
plan instance; its partitions run concurrently and cooperatively split the file 
groups → table read once → fast and correct.
   - Each later task is dispatched individually as a slot frees → its own plan 
instance → drains all file groups alone → whole-table scan → ~N× CPU and N× 
rows.
   
   ### Correctness impact
   
   q1 SF10, 16 partitions, `prefer_hash_join=false`:
   
   | executor slots | waves | `count_order` (A/F) | correct? |
   |---:|---:|---:|:--:|
   | 16 | 1 | 14,804,077 | ✅ |
   | 8 | 2 | 118,432,616 | ❌ (exactly 8×) |
   
   Total rows counted on `-c8` = 473,140,872 = 8 × the table. Averages still 
match (8×sum / 8×count cancels), which is why it can pass spot checks; the 
benchmark harness only validates results at SF1.
   
   ### Minimal standalone reproducer (pure DataFusion 54, no Ballista)
   
   Register a multi-file parquet table with `target_partitions = N`, build a 
physical plan, then:
   
   ```rust
   // TEST 1: poll all N partitions concurrently on ONE instance
   //   -> each reads ~1/N of the rows; sum == table row count.   (correct)
   // TEST 2: poll a single partition alone on its own instance
   //   -> that one partition reads the ENTIRE table.             (the bug)
   let stream = plan.execute(p, ctx.task_ctx())?;   // for a lone p, reads 
everything
   ```
   
   Observed on 16 files (TPC-H lineitem SF10): TEST 1 → 16/16 partitions each 
~3.7M rows, sum 59,142,609; TEST 2 → `execute(0)`, `execute(8)`, `execute(15)` 
each return 59,142,609 (the whole table).
   
   This is intrinsic to `FileScanConfig` under DF54 (`scheduling_type()` is 
hardcoded `Cooperative`; `repartition_file_scans=false` does not change it). It 
worked on DF53, where `DataSourceExec::execute(p)` read file group `p` 
statically.
   
   ### Fix that works
   
   In the executor, restrict each task's `DataSourceExec` to the file group for 
its `partition_id` before execution (other group slots emptied, partition count 
preserved), so the lone `execute(partition_id)` reads only its group. With this 
change, q1 `-c8` is correct (A/F 14,804,077) and back to ~0.50 s, and q1–q14 
all return canonical TPC-H row counts. Guard: skip when `partition_id >= 
file_groups.len()` (an operator between the scan and the stage output changed 
the partition count). PR to follow.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to