andygrove commented on PR #2639:
URL:
https://github.com/apache/datafusion-comet/pull/2639#issuecomment-3446954076
> > Ok so the previous time I wanted to make this PR I encountered something
interesting: Imagine something like the following code on DataFusion's SortExec
> > ```
> > futures::stream::once(async move {
> > while let Some(batch) = input.next().await {
> > let batch = batch?;
> > sorter.insert_batch(batch).await?;
> > }
> > sorter.sort().await
> > })
> > .try_flatten(),
> > ```
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > This will essentially consume the entire input iterator, all the way to
the operator producing the ColumnarBatch on the Scala side. The underlying
arrow arrays can be produced off-heap, it doesn't matter, because the wrapped
ArrowArray and ArrowSchema objects are created using heap memory, so any
ArrayRef created from the ArrayData of that passed arrow array, will use a
small amount of heap memory. The issue is that a code like the one above, that
will consume the entire input iterator, will cause such high GC pressure, that
performance can decrease by up to 10x compared to Spark. It will not always
show up on local performance runs(I saw the horrible performance when running
on an EC2 Cluster with a huge amount of data) The only solution I've found was
to do a deep copy of the ArrayData itself. I know this seems paradoxical but
it's a real-life issue. The best way I know to handle this is to just do a full
copy of the ArrayData before make_array in the scan for _all_ arrays.
The unpacking can happen before that I guess, but hopefully DataFusion will
have enough support for dictionaries this could be ignored completely.
> > When we use off-heap memory, naturally we want to reduce the executor
memory, which exacerbates the issue.
>
> Thanks, this is really helpful. I will experiment with this.
Claude's analysis of the PR and this comment:
The Core Problem
Memory Model Mismatch
When Arrow arrays are passed from Spark (JVM) to DataFusion (Rust):
- Data buffers: Can be allocated off-heap (native memory)
- Wrapper objects (ArrowArray, ArrowSchema): Always allocated on Java heap
- Even though the actual data is off-heap, each ArrayRef needs these small
heap-allocated wrappers
GC Pressure from Buffering Operators
The issue arises with operators like SortExec that consume their entire
input:
```rust
futures::stream::once(async move {
while let Some(batch) = input.next().await {
sorter.insert_batch(batch).await?; // Accumulates all batches
}
sorter.sort().await // Only then produces output
})
```
This pattern:
1. Consumes the entire input iterator before producing output
2. Creates many wrapper objects (one set per batch)
3. Keeps all wrappers alive until sorting completes
4. Causes severe GC pressure as thousands of small objects accumulate on
the heap
Why It's Worse with Off-Heap Memory
When using off-heap memory for performance, users typically:
- Reduce executor heap size (since data is off-heap)
- This makes the heap smaller → GC pressure from wrapper objects becomes
catastrophic
- Can cause 10x performance degradation on clusters with large data
Why Deep Copy Solves It (The Paradox)
The comment suggests doing a deep copy of ArrayData before make_array in
the scan. This seems counterintuitive but works because:
1. Immediate Materialization: Deep copy fully materializes data into new
arrays
2. Immediate GC: Original wrapper objects can be garbage collected right
away
3. Clean Boundaries: Each batch owns its data completely
4. Less GC Thrashing: Even though copying costs CPU, it's cheaper than
continuous GC pauses
Without copy: Many small wrapper objects → constant GC pressureWith copy:
Upfront copy cost → clean memory lifecycle → smooth execution
Current PR Context
Looking at your PR, you've:
- Removed the separate CopyExec operator
- Moved copy_array and copy_or_unpack_array functions into scan.rs
- The copy happens at scan time (lines 270-278 in scan.rs):
```rust
let array = if arrow_ffi_safe {
copy_or_unpack_array(&array, &CopyMode::UnpackOrClone)?
} else {
copy_array(&array) // Deep copy
};
```
The commenter is saying: Make sure this deep copy happens for all arrays
to prevent the GC pressure issue, especially for operators that buffer entire
inputs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]