Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
leoyvens commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2757800355 @xudong963 see https://github.com/apache/datafusion/issues/10336, that flag groups Parquet scanning to a single partition. `ProgressiveEval` here is proposed to preserve the partitioning of the input scan, and to run 2 inputs at a time, so would preserve some parallelism. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
alamb commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2770322823 @wiedld has created a PR with a version of the lexical range PR for review - https://github.com/influxdata/arrow-datafusion/pull/63 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
alamb commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2762071720 > @alamb Fyi: here is a POC: https://github.com/xudong963/arrow-datafusion/pull/4 @wiedld - can you please review the above PRs and work with @xudong963 to make sure our implementations are aligned? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
xudong963 commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2760211599 @alamb Fyi: here is a POC: https://github.com/xudong963/arrow-datafusion/pull/4 The PR glues three PRs: - The origin PR: https://github.com/apache/datafusion/pull/13296 to optimize SortPreservingMergeExec to avoid merging non-overlapping partitions - The PR: https://github.com/apache/datafusion/pull/15432 which adds statistics for FileGroup, aka, partition-level statistics. - Add ProgressiveEval operator: https://github.com/apache/datafusion/pull/10490 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
alamb commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2759403638 > > * Specifically [feat: Add `ProgressiveEval` operator #10490](https://github.com/apache/datafusion/pull/10490) > > Hi [@alamb](https://github.com/alamb) [@wiedld](https://github.com/wiedld) , how's it going? Can I do something to help? We are a bit blocked on some of the overlap analysis -- I am going to try and pitch in and see if we can push something forward -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
suremarc commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2758090094 > > The only reason it is not needed here is because there are fewer files than `target_partitions`, so this will not work if we increase the number of files or reduce `target_partitions`. If we set `target_partitions` to 1 then it requires a sort: > > I reread the codebase, and also think so. > > [FileScanConfig::split_groups_by_statistics](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569) definitely can solve the problem, then we can remove unnecessary `SortExec` which will be significant gains! > > One question: Is there something that makes it difficult to turn on `split_groups_by_statistics` by default? I left my [full answer](https://github.com/apache/datafusion/issues/10336#issuecomment-2758082825) on that issue as I don't want to take over this issue too much, but TL;DR we need benchmarks for tables with large numbers of files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
xudong963 commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2758008447 > * Specifically [feat: Add `ProgressiveEval` operator #10490](https://github.com/apache/datafusion/pull/10490) Hi @alamb @wiedld , how's it going? Can I do something to help? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
xudong963 commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2758001706 > [@xudong963](https://github.com/xudong963) see [#10336](https://github.com/apache/datafusion/issues/10336), that flag groups Parquet scanning to a single partition. `ProgressiveEval` here is proposed to preserve the partitioning of the input scan, and to run 2 inputs at a time, so would preserve some parallelism. Got it, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
xudong963 commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2756831956 > The only reason it is not needed here is because there are fewer files than `target_partitions`, so this will not work if we increase the number of files or reduce `target_partitions`. If we set `target_partitions` to 1 then it requires a sort: I reread the codebase, and also think so. [FileScanConfig::split_groups_by_statistics](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569) definitely can solve the problem, then we can remove unnecessary `SortExec` which will be significant gains! One question: Is there something that makes it difficult to turn on `split_groups_by_statistics` by default? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730732469
> I think it uses
[FileGroupPartitioner](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html)
that maintains the same ordering... But maybe it is a different ordering than
you have in mind
AFAICT `FileGroupPartitioner` is able to maintain the same ordering as what
it is given, yes. But the initial ordering for the partitions as determined by
`ListingTable` is ultimately just sorting by the object store paths (see
[here](https://docs.rs/datafusion-catalog-listing/46.0.0/src/datafusion_catalog_listing/helpers.rs.html#133-136)),
which may not match the table order. This is the case in my (somewhat
contrived) example above, where the table order is `id ASC` but `id` decreases
as `date` increases.
For a more realistic example where the table order and object store path
order don't match, consider a horizontally partitioned table, something like
this:
```sql
DataFusion CLI v46.0.0
> SET datafusion.execution.target_partitions=2;
0 row(s) fetched.
Elapsed 0.001 seconds.
> CREATE EXTERNAL TABLE t1 (time TIMESTAMP, date DATE, shard INT) STORED AS
PARQUET LOCATION '/tmp/data/' PARTITIONED BY (date, shard) WITH ORDER (time
ASC);
0 row(s) fetched.
Elapsed 0.003 seconds.
> INSERT INTO t1 VALUES
('2025-03-01 00:00:01', '2025-03-01', 0),
('2025-03-01 00:00:00', '2025-03-01', 1),
('2025-03-02 00:00:00', '2025-03-02', 0),
('2025-03-02 00:00:02', '2025-03-02', 1);
+---+
| count |
+---+
| 4 |
+---+
1 row(s) fetched.
Elapsed 0.011 seconds.
> EXPLAIN SELECT * FROM t1 ORDER BY time ASC;
+---+-+
| plan_type | plan
|
+---+-+
| logical_plan | Sort: t1.time ASC NULLS LAST
|
| | TableScan: t1 projection=[time, date, shard]
|
| physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST]
|
| | SortExec: expr=[time@0 ASC NULLS LAST],
preserve_partitioning=[true]
|
| | DataSourceExec: file_groups={2 groups:
[[tmp/data/date=2025-03-01/shard=0/8eTZY2WyyhnV7Klv.parquet,
tmp/data/date=2025-03-01/shard=1/8eTZY2WyyhnV7Klv.parquet],
[tmp/data/date=2025-03-02/shard=0/8eTZY2WyyhnV7Klv.parquet,
tmp/data/date=2025-03-02/shard=1/8eTZY2WyyhnV7Klv.parquet]]}, projection=[time,
date, shard], file_type=parquet |
| |
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
alamb commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730420780 > I don't mean to tout my own horn too much, but in fact this exact use case is what [FileScanConfig::split_groups_by_statistics](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569) was written to solve. We can solve the problem locally at the DataSourceExec, which I think is the right place to do it. It's still gated behind a feature flag, I have not been able to dedicate the time to set up benchmarks for ListingTable which I think is required to take this feature out of being experimental and ship it. I agree -- in my mind this is all related -- when trying to take maximum advantage of pre-existing orderings I do think the optimizer should be more careful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730417397
> Incidentally this is the use case I am targeting. Anyway, this query would
result in at least 3 partitions, two of which are overlapping in time. If we
could generalize ProgressiveEval to have multiple partition groupings, we could
do something like this:
This is a neat idea -- basically a cascade of progressive evals to avoid
some (but not all) merging
> I see that one of the target use cases for ProgressiveEval is
https://github.com/apache/datafusion/issues/6672. I am a little curious to see
the implementation, because the way I see it, ProgressiveEval will solve some
but not all instances of this problem.
Our progressive eval implementation is here
https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query/src/provider/progressive_eval.rs
(there is a PR to add it to Datafusion here:
https://github.com/apache/datafusion/pull/10490)
> Here is the issue I am worried about. When you use ListingTable you have a
config option called target_partitions. By default it is set to the number of
available cpu cores on the system. If the number of files exceeds
target_partitions then it will start to merge files into the same partitions
with no guarantees on ordering. Let me demonstrate using datafusion-cli:
> If the number of files exceeds target_partitions then it will start to
merge files into the same partitions with no guarantees on ordering.
I think it uses
[`FileGroupPartitioner`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html)
that maintains the same ordering... But maybe it is a different ordering than
you have in mind
You can define an order using `WITH ORDER` like this:
```sql
DataFusion CLI v46.0.1
> copy (values (4, '2025-03-01')) to '/tmp/test/1.parquet';
+---+
| count |
+---+
| 1 |
+---+
1 row(s) fetched.
Elapsed 0.004 seconds.
> copy (values (3, '2025-03-02')) to '/tmp/test/2.parquet';
+---+
| count |
+---+
| 1 |
+---+
1 row(s) fetched.
Elapsed 0.004 seconds.
> create external table test stored as parquet location '/tmp/test' with
order (column2 ASC);
0 row(s) fetched.
Elapsed 0.006 seconds.
```
Then the sort is not needed:
```sql
> explain select * from test order by column2 asc;
+---+---+
| plan_type | plan
|
+---+---+
| logical_plan | Sort: test.column2 ASC NULLS LAST
|
| | TableScan: test projection=[column1, column2]
|
| physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST]
|
| | DataSourceExec: file_groups={16 groups:
[[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107],
[tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214],
[tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2],
output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
| |
Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]
suremarc commented on issue #15191: URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2722432514 Thanks for the writeup. The core idea makes sense, but I have a couple of comments on the design, in particular I think we can do better in a few ways. # 1. Some but not all partitions overlap As I understand it, the current implementation gives up if any two partitions are overlapping. This is fine if your target use case involves queries with no overlapping partitions, but we can do better. Basically, if we take the proposed algorithm and add another step at the end where we apply "first fit", instead of producing only one lexical ordering or failing fast, we can produce multiple groups of "chained" input partitions, each ordered lexically by the sort key within themselves, and we can optimize the query to use the minimum number of chains possible. Let me give an example: ```sql SELECT * FROM recent_table_1 WHERE time > now() - INTERVAL 1 DAY UNION ALL SELECT * FROM recent_table_2 WHERE time > now() - INTERVAL 1 DAY UNION ALL SELECT * FROM historic_table WHERE time < now() - INTERVAL 1 DAY ORDER BY time ASC ``` Incidentally this is the use case I am targeting. Anyway, this query would result in at least 3 partitions, two of which are overlapping in `time`. If we could generalize `ProgressiveEval` to have multiple partition groupings, we could do something like this: ```sql SortPreservingMergeExec: time ASC ProgressiveEval: partitions=[2, 0], [1] UnionExec: partitions=[0, 1, 2] TableExec: recent_table_1 TableExec: recent_table_2 TableExec: historic_table ``` This would concatenate partitions 2 and 0, and partition 1 remains unchanged. Then a final `SortPreservingMergeExec` is required to merge these into one sorted stream. The "first fit" algorithm has actually already been implemented in [`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569) which uses the [`MinMaxStatistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/statistics.rs#L64) helper to analyze min/maxes, however it was written to be used in `ParquetExec` (now deprecated and replaced with `DataSourceExec` I believe). I believe this change could be retrofitted onto `ProgressiveEval` with not too much additional complexity. The analysis is already implemented, we just need to use it. Or we can take the implementation from influxdb and just add the final "first fit" step. # 2. Scanning non-overlapping Parquet files I see that one of the target use cases for `ProgressiveEval` is #6672. I am a little curious to see the implementation, because the way I see it, `ProgressiveEval` will solve some but not all instances of this problem. Here is the issue I am worried about. When you use `ListingTable` you have a config option called `target_partitions`. By default it is set to the number of available cpu cores on the system. If the number of files exceeds `target_partitions` then it will start to merge files into the same partitions with no guarantees on ordering. Let me demonstrate using `datafusion-cli`: ```sql > SET datafusion.execution.target_partitions=2; 0 row(s) fetched. Elapsed 0.000 seconds. > CREATE EXTERNAL TABLE t1 (id INT, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC); 0 row(s) fetched. Elapsed 0.002 seconds. > INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04'); +---+ | count | +---+ | 4 | +---+ 1 row(s) fetched. Elapsed 0.004 seconds. > EXPLAIN SELECT * FROM t1 ORDER BY id ASC; +---++ | plan_type | plan | +---++ | logical_plan | Sort: t1.id ASC NULLS LAST
