wolffcm opened a new issue, #7077:
URL: https://github.com/apache/arrow-datafusion/issues/7077
### Is your feature request related to a problem or challenge?
In IOx, many of our queries need to union data both from parquet files
(which are sorted) and data from `RecordBatchesExec` (which is not sorted). The
output of the union usually needs to be deduplicated (remove rows that have the
same primary key). Deduplicating can be expensive. It requires sorted input,
but it can be parallelized across multiple partitions.
Recently I started to experiment with using `SortPreservingRepartitionExec`
to improve performance of deduplication. It works fine if the input is
completely sorted, but if it is a union of sorted and unsorted data, I end up
with a plan that sorts everything (even the parquet files which are already
sorted) and doesn't use the sort-preserving variant of `RepartitionExec`.
That is to say, I end up with a plan that looks like this:
```text
DeduplicateExec: [tag@3 ASC,time@4 ASC]
SortExec: expr=[tag@3 ASC,time@4 ASC,__chunk_order@0 ASC]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([tag@3, time@4], 4),
input_partitions=4
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
UnionExec
RecordBatchesExec: batches_groups=1 batches=1 total_rows=2
ParquetExec: file_groups={1 group: [[...]]}
```
(The back-to-back `RepartitionExec` also seems undesirable here)
But I would like to have a plan that looks like this:
```
DeduplicateExec: [tag@3 ASC,time@4 ASC]
CoalesceBatchesExec: target_batch_size=8192
SortPreservingRepartitionExec: partitioning=Hash(...),
input_partitions=4
UnionExec
SortExec: expr=[...]
RecordBatchesExec: batches_groups=1 batches=1 total_rows=2
ParquetExec: file_groups={1 group: [[...]]}
```
In practice, I have seen that re-sorting all the data makes performance
quite a bit worse, even considering that deduplication can be parallelized.
### Describe the solution you'd like
I think this could be addressed by updating existing planner rules (like
`replace_partition_execs`) or perhaps adding one or more new ones.
### Describe alternatives you've considered
The alternative here I think is what IOx does now, which is that it does not
parallelize operations that require sorted input at all.
### Additional context
🎉 PR that introduced physical planner rules to create
`SortPreservingRepartitionExec`:
https://github.com/apache/arrow-datafusion/pull/6921
Related IOx issue: https://github.com/influxdata/influxdb_iox/issues/8292
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]