shner-elmo opened a new issue, #7781:
URL: https://github.com/apache/arrow-datafusion/issues/7781
### Describe the bug
So I noticed that when executing the query `SELECT DISTINCT my_column FROM
dataset LIMIT 10` (trough the Python connector), it was taking lots of time and
consuming most of my RAM.
I then looked at the query plan, and it seems like its actually doing `GROUP
BY my_column` which causes a full-scan, what makes it even worse, is that all
10 values returned are present in the **first** parquet file in the dataset
(`pyarrow.Dataset.files[0]`), so it could've just stopped scanning after the
first file immediately.
### To Reproduce
```py
import datafusion
import time
start = time.perf_counter()
ctx = datafusion.SessionContext()
ctx.register_dataset('dataset', dataset)
out = ctx.sql("SELECT DISTINCT my_column FROM dataset LIMIT 10")
print('datafusion', time.perf_counter() - start)
print(repr(out.logical_plan()))
print('-' * 10)
print(repr(out.optimized_logical_plan()))
print('-' * 10)
print(repr(out.execution_plan()))
print('-' * 10)
print(repr(out.explain()))
print('-' * 10)
```
```
datafusion 0.0003965760000141927
Limit: skip=0, fetch=10
Distinct:
Projection: dataset.canonical_url
TableScan: dataset
----------
Limit: skip=0, fetch=10
Aggregate: groupBy=[[dataset.canonical_url]], aggr=[[]]
TableScan: dataset projection=[canonical_url]
----------
GlobalLimitExec: skip=0, fetch=10
CoalescePartitionsExec
LocalLimitExec: fetch=10
AggregateExec: mode=FinalPartitioned, gby=[canonical_url@0 as
canonical_url], aggr=[]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([canonical_url@0], 8),
input_partitions=568
AggregateExec: mode=Partial, gby=[canonical_url@0 as
canonical_url], aggr=[]
DatasetExec: number_of_fragments=568,
projection=[canonical_url]
----------
DataFrame()
+---------------+---------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+---------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10
|
| | Aggregate: groupBy=[[dataset.canonical_url]], aggr=[[]]
|
| | TableScan: dataset projection=[canonical_url]
|
| physical_plan | GlobalLimitExec: skip=0, fetch=10
|
| | CoalescePartitionsExec
|
| | LocalLimitExec: fetch=10
|
| | AggregateExec: mode=FinalPartitioned,
gby=[canonical_url@0 as canonical_url], aggr=[] |
| | CoalesceBatchesExec: target_batch_size=8192
|
| | RepartitionExec:
partitioning=Hash([canonical_url@0], 8), input_partitions=568 |
| | AggregateExec: mode=Partial,
gby=[canonical_url@0 as canonical_url], aggr=[] |
| | DatasetExec: number_of_fragments=568,
projection=[canonical_url] |
| |
|
+---------------+---------------------------------------------------------------------------------------------+
None
----------
```
### Expected behavior
It should stop scanning at the first 10 unique values found.
### Additional context
For context, I'm querying a Parquet dataset, and the files are stored
locally:
```py
import pyarrow.dataset as ds
dataset = ds.dataset(...)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]