RyanJamesStewart opened a new pull request, #22205:
URL: https://github.com/apache/datafusion/pull/22205

   ## Which issue does this PR close?
   
   Closes #22164.
   
   ## Rationale for this change
   
   When partial hash aggregation hits the memory limit and switches to 
early-emit mode (`EmitTo::First(n)`), three of the four `GroupColumn::take_n` 
implementations extract the first n elements with 
`drain(0..n).collect::<Vec<_>>()`:
   
   - `bytes.rs:383` (`self.offsets`)
   - `primitive.rs:270` (`self.group_values`)
   - `bytes_view.rs:366` (`self.views`)
   
   As the issue reporter noted: `drain` does not affect the Vec's capacity, so 
`self.offsets` / `self.group_values` / `self.views` keeps the pre-emit 
allocation with reduced length. The whole point of the OOM-triggered early emit 
is to release memory; with the current code the builder ends up holding the 
same buffer it had a moment before the emit, which defeats the signal.
   
   (The fourth implementation, `boolean.rs::take_n`, already does the right 
thing via swap-then-truncate over `BooleanBufferBuilder`; out of scope.)
   
   ## What changes are included in this PR?
   
   For each of the three Vec-based call sites, replace `drain(0..n).collect()` 
with `std::mem::take` + `Vec::split_off(n)`:
   
   ```rust
   // before
   let first_n = self.vec.drain(0..n).collect::<Vec<_>>();
   //   self.vec : length = len - n, capacity = pre-emit capacity (stranded)
   //   first_n  : freshly allocated, sized to n
   
   // after
   let mut first_n = std::mem::take(&mut self.vec);
   self.vec = first_n.split_off(n);
   //   self.vec : new allocation sized to len - n (capacity reclaimed)
   //   first_n  : original allocation, length n, retains pre-emit capacity
   ```
   
   `Vec::split_off(at)` keeps `[0, at)` in `self` with the original capacity 
unchanged and returns a new Vec for `[at, len)` sized to its actual length. 
Combined with `mem::take`, the *retained* side becomes the freshly-sized buffer 
and the *emitted* side owns the original allocation. That is the correct 
assignment for OOM-emit because:
   
   1. The emitted Vec is consumed immediately by `ScalarBuffer::from(Vec<T>)`, 
which is zero-copy via `Vec::into_raw_parts`; the original buffer travels into 
the output `ArrayRef` and is freed when downstream consumers drop it.
   2. The retained Vec is what the builder keeps accumulating into. A 
freshly-sized buffer is what the issue's memory-pressure signal is asking for.
   
   **Allocation accounting.** Same allocation count as before. `mem::take` 
swaps with `Vec::new()` (no allocation); `split_off` allocates exactly one Vec 
for the tail; the original `drain(..n).collect()` allocated exactly one Vec for 
the head. The fix changes which side gets the new allocation, not how many 
allocations happen. The win is correct capacity assignment, not reduced 
allocation work.
   
   **Bonus on `bytes.rs`.** The drain-then-collect is immediately followed by 
`first_n_offsets.push(offset_n)` to close the offset range. With `collect()`, 
`first_n_offsets` has capacity exactly `n`, so the push triggered a 
reallocation. With `split_off`, `first_n_offsets` carries the pre-emit 
capacity, and the push fits without reallocating.
   
   Diff is 16 insertions, 3 deletions across the three files.
   
   ## Are these changes tested?
   
   Yes, by existing coverage:
   
   - `test_byte_take_n` (bytes.rs)
   - `test_byte_view_take_n`, 
`test_byte_view_take_n_partial_completed_nonzero_index` (bytes_view.rs)
   - `test_emit_first_n_for_vectorized_group_values`, 
`test_hashtable_modifying_in_emit_first_n` exercise the partial-aggregation 
emit path end-to-end through the `GroupColumn` trait, covering `primitive.rs`'s 
implementation.
   
   `cargo test -p datafusion-physical-plan --lib aggregates::group_values` 
passes (26 tests, 0 failed). `cargo clippy -p datafusion-physical-plan --lib -- 
-D warnings` is clean.
   
   No new test added: the existing tests pin the take_n behavior (output array 
correctness and remaining-side state), and the fix is behaviorally identical at 
that level. The change is in which Vec each side ends up holding. The only 
observable difference is heap reservation on the retained side, which is what 
the issue reports; `Vec::capacity()` is implementation-defined and asserting on 
it from a test would be flaky under allocator changes, so I have not added one. 
Happy to add a capacity assertion if you'd prefer it; flag and I'll push.
   
   ## Are there any user-facing changes?
   
   No. No API changes, no behavior change at the operator level. Purely an 
internal memory-pressure improvement on the partial-aggregation early-emit path.
   
   ---
   
   AI-assisted. The fix shape (mem::take + split_off direction) and the 
implementation are mine; I used Claude to help survey the three call sites and 
check that I had not missed a `take_n` impl elsewhere in `group_values/`. The 
substantive comprehension steps I went through:
   
   - Verified the issue reporter's mechanism description against each of the 
three sites; the drain-capacity gap is real and the sites share the same shape.
   - Traced the emitted Vec's lifetime through `ScalarBuffer::from(Vec<T>) -> 
Buffer::from(Vec<T>) -> Vec::into_raw_parts`, which is what justifies routing 
the original allocation to the emitted side.
   - Verified the allocation count is unchanged so the PR does not make a perf 
claim that would not hold up under `run benchmarks` on a non-OOM workload. The 
improvement is qualitative (memory-pressure semantics), not throughput.
   - Confirmed `boolean.rs::take_n` is already capacity-correct (different 
primitive, `BooleanBufferBuilder` swap-then-truncate) so the PR is correctly 
scoped to the three Vec-based sites.
   
   Named unknown: I have not measured the heap-reservation improvement under an 
actual OOM-bound workload (e.g. `clickbench_partitioned` with a memory limit). 
The fix is justified by the mechanism analytically; an empirical peak-RSS 
confirmation belongs in a follow-up if you want the perf-evidence side 
strengthened.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to