EmilyMatt commented on PR #2639:
URL:
https://github.com/apache/datafusion-comet/pull/2639#issuecomment-3437940048
Ok so the previous time I wanted to make this PR I've noticed something
interesting:
so imagine something like the following code on DataFusion's SortExec
```
futures::stream::once(async move {
while let Some(batch) = input.next().await {
let batch = batch?;
sorter.insert_batch(batch).await?;
}
sorter.sort().await
})
.try_flatten(),
```
This will essentially consume the entire input iterator, all the way to the
operator producing the ColumnarBatch on the Scala side.
The underlying arrow arrays can be produced off-heap, it doesn't matter,
because the wrapped ArrowArray and ArrowSchema objects are created using heap
memory, so any ArrayRef created from the ArrayData of that passed arrow array,
will use a small amount of heap memory.
The issue is that a code like the one above, that will consume the entire
input iterator, will cause such high GC pressure, that performance can decrease
by up to 10x compared to Spark.
It will not always show up on local performance runs(I saw the horrible
performance when running on an EC2 Cluster with a huge amount of data)
The only solution I've found was to do a deep copy of the ArrayData itself.
I know this seems paradoxical but it's a real-life issue.
The best way I know to handle this is to just do a full copy of the
ArrayData before make_array in the scan for *all* arrays.
The unpacking can happen before that I guess, but hopefully DataFusion will
have enough support for dictionaries this could be ignored completely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]