Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-04-05 Thread via GitHub


leoyvens commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2757800355

   @xudong963 see https://github.com/apache/datafusion/issues/10336, that flag 
groups Parquet scanning to a single partition. `ProgressiveEval` here is 
proposed to preserve the partitioning of the input scan, and to run 2 inputs at 
a time, so would preserve some parallelism.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-04-01 Thread via GitHub


alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2770322823

   @wiedld  has created a PR with a version of the lexical range PR for review
   - https://github.com/influxdata/arrow-datafusion/pull/63


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-30 Thread via GitHub


alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2762071720

   > @alamb Fyi: here is a POC: 
https://github.com/xudong963/arrow-datafusion/pull/4
   
   @wiedld  - can you please review the above PRs and work with @xudong963  to 
make sure our implementations are aligned?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-27 Thread via GitHub


xudong963 commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2760211599

   @alamb Fyi: here is a POC: 
https://github.com/xudong963/arrow-datafusion/pull/4
   
   The PR glues three PRs:
   
   - The origin PR: https://github.com/apache/datafusion/pull/13296 to optimize 
SortPreservingMergeExec to avoid merging non-overlapping partitions
   - The PR: https://github.com/apache/datafusion/pull/15432 which adds 
statistics for FileGroup, aka, partition-level statistics.
   - Add ProgressiveEval operator: 
https://github.com/apache/datafusion/pull/10490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-27 Thread via GitHub


alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2759403638

   > > * Specifically [feat: Add  `ProgressiveEval` operator 
#10490](https://github.com/apache/datafusion/pull/10490)
   > 
   > Hi [@alamb](https://github.com/alamb) [@wiedld](https://github.com/wiedld) 
, how's it going? Can I do something to help?
   
   We are a bit blocked on some of the overlap analysis -- I am going to try 
and pitch in and see if we can push something forward


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-27 Thread via GitHub


suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2758090094

   > > The only reason it is not needed here is because there are fewer files 
than `target_partitions`, so this will not work if we increase the number of 
files or reduce `target_partitions`. If we set `target_partitions` to 1 then it 
requires a sort:
   > 
   > I reread the codebase, and also think so.
   > 
   > 
[FileScanConfig::split_groups_by_statistics](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
 definitely can solve the problem, then we can remove unnecessary `SortExec` 
which will be significant gains!
   > 
   > One question: Is there something that makes it difficult to turn on 
`split_groups_by_statistics`  by default?
   
   I left my [full 
answer](https://github.com/apache/datafusion/issues/10336#issuecomment-2758082825)
 on that issue as I don't want to take over this issue too much, but TL;DR we 
need benchmarks for tables with large numbers of files. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-27 Thread via GitHub


xudong963 commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2758008447

   > * Specifically [feat: Add  `ProgressiveEval` operator 
#10490](https://github.com/apache/datafusion/pull/10490)
   
   Hi @alamb @wiedld , how's it going? Can I do something to help?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-27 Thread via GitHub


xudong963 commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2758001706

   > [@xudong963](https://github.com/xudong963) see 
[#10336](https://github.com/apache/datafusion/issues/10336), that flag groups 
Parquet scanning to a single partition. `ProgressiveEval` here is proposed to 
preserve the partitioning of the input scan, and to run 2 inputs at a time, so 
would preserve some parallelism.
   
   Got it, thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-26 Thread via GitHub


xudong963 commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2756831956

   > The only reason it is not needed here is because there are fewer files 
than `target_partitions`, so this will not work if we increase the number of 
files or reduce `target_partitions`. If we set `target_partitions` to 1 then it 
requires a sort:
   
   I reread the codebase, and also think so.
   
   
[FileScanConfig::split_groups_by_statistics](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
 definitely can solve the problem, then we can remove unnecessary `SortExec` 
which will be significant gains!
   
   One question: Is there something that makes it difficult to turn on 
`split_groups_by_statistics`  by default?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-17 Thread via GitHub


suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730732469

   > I think it uses 
[FileGroupPartitioner](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html)
 that maintains the same ordering... But maybe it is a different ordering than 
you have in mind
   
   AFAICT `FileGroupPartitioner` is able to maintain the same ordering as what 
it is given, yes. But the initial ordering for the partitions as determined by 
`ListingTable` is ultimately just sorting by the object store paths (see 
[here](https://docs.rs/datafusion-catalog-listing/46.0.0/src/datafusion_catalog_listing/helpers.rs.html#133-136)),
 which may not match the table order. This is the case in my (somewhat 
contrived) example above, where the table order is `id ASC` but `id` decreases 
as `date` increases. 
   
   For a more realistic example where the table order and object store path 
order don't match, consider a horizontally partitioned table, something like 
this:
   
   ```sql
   DataFusion CLI v46.0.0
   > SET datafusion.execution.target_partitions=2;
   0 row(s) fetched. 
   Elapsed 0.001 seconds.
   
   > CREATE EXTERNAL TABLE t1 (time TIMESTAMP, date DATE, shard INT) STORED AS 
PARQUET LOCATION '/tmp/data/' PARTITIONED BY (date, shard) WITH ORDER (time 
ASC);
   0 row(s) fetched. 
   Elapsed 0.003 seconds.
   
   > INSERT INTO t1 VALUES 
   ('2025-03-01 00:00:01', '2025-03-01', 0), 
   ('2025-03-01 00:00:00', '2025-03-01', 1), 
   ('2025-03-02 00:00:00', '2025-03-02', 0), 
   ('2025-03-02 00:00:02', '2025-03-02', 1);
   +---+
   | count |
   +---+
   | 4 |
   +---+
   1 row(s) fetched. 
   Elapsed 0.011 seconds.
   
   > EXPLAIN SELECT * FROM t1 ORDER BY time ASC;
   
+---+-+
   | plan_type | plan   



 |
   
+---+-+
   | logical_plan  | Sort: t1.time ASC NULLS LAST   



 |
   |   |   TableScan: t1 projection=[time, date, shard] 



 |
   | physical_plan | SortPreservingMergeExec: [time@0 ASC NULLS LAST]   



 |
   |   |   SortExec: expr=[time@0 ASC NULLS LAST], 
preserve_partitioning=[true]


  |
   |   | DataSourceExec: file_groups={2 groups: 
[[tmp/data/date=2025-03-01/shard=0/8eTZY2WyyhnV7Klv.parquet, 
tmp/data/date=2025-03-01/shard=1/8eTZY2WyyhnV7Klv.parquet], 
[tmp/data/date=2025-03-02/shard=0/8eTZY2WyyhnV7Klv.parquet, 
tmp/data/date=2025-03-02/shard=1/8eTZY2WyyhnV7Klv.parquet]]}, projection=[time, 
date, shard], file_type=parquet |
   |   |


  

Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-17 Thread via GitHub


alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730420780

   > I don't mean to tout my own horn too much, but in fact this exact use case 
is what 
[FileScanConfig::split_groups_by_statistics](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
 was written to solve. We can solve the problem locally at the DataSourceExec, 
which I think is the right place to do it. It's still gated behind a feature 
flag, I have not been able to dedicate the time to set up benchmarks for 
ListingTable which I think is required to take this feature out of being 
experimental and ship it.
   
   I agree -- in my mind this is all related -- when trying to take maximum 
advantage of pre-existing orderings I do think the optimizer should be more 
careful.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-17 Thread via GitHub


alamb commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2730417397

   > Incidentally this is the use case I am targeting. Anyway, this query would 
result in at least 3 partitions, two of which are overlapping in time. If we 
could generalize ProgressiveEval to have multiple partition groupings, we could 
do something like this:
   
   This is a neat idea -- basically a cascade of progressive evals to avoid 
some (but not all) merging
   
   
   > I see that one of the target use cases for ProgressiveEval is 
https://github.com/apache/datafusion/issues/6672. I am a little curious to see 
the implementation, because the way I see it, ProgressiveEval will solve some 
but not all instances of this problem.
   
   Our progressive eval implementation is here 
https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/iox_query/src/provider/progressive_eval.rs
   
   (there is a PR to add it to Datafusion here: 
https://github.com/apache/datafusion/pull/10490)
   
   
   > Here is the issue I am worried about. When you use ListingTable you have a 
config option called target_partitions. By default it is set to the number of 
available cpu cores on the system. If the number of files exceeds 
target_partitions then it will start to merge files into the same partitions 
with no guarantees on ordering. Let me demonstrate using datafusion-cli:
   
   > If the number of files exceeds target_partitions then it will start to 
merge files into the same partitions with no guarantees on ordering. 
   
   I think it uses 
[`FileGroupPartitioner`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroupPartitioner.html)
 that maintains the same ordering... But maybe it is a different ordering than 
you have in mind
   
   You can define an order using `WITH ORDER` like this:
   
   ```sql
   DataFusion CLI v46.0.1
   > copy (values (4, '2025-03-01')) to '/tmp/test/1.parquet';
   +---+
   | count |
   +---+
   | 1 |
   +---+
   1 row(s) fetched.
   Elapsed 0.004 seconds.
   
   > copy (values (3, '2025-03-02')) to '/tmp/test/2.parquet';
   +---+
   | count |
   +---+
   | 1 |
   +---+
   1 row(s) fetched.
   Elapsed 0.004 seconds.
   
   > create external table test stored as parquet location '/tmp/test' with 
order (column2 ASC);
   0 row(s) fetched.
   Elapsed 0.006 seconds.
   ```
   
   Then the sort is not needed:
   ```sql
   > explain select * from test order by column2 asc;
   
+---+---+
   | plan_type | plan   


   |
   
+---+---+
   | logical_plan  | Sort: test.column2 ASC NULLS LAST  


   |
   |   |   TableScan: test projection=[column1, column2]


   |
   | physical_plan | SortPreservingMergeExec: [column2@1 ASC NULLS LAST]


   |
   |   |   DataSourceExec: file_groups={16 groups: 
[[tmp/test/1.parquet:0..107], [tmp/test/2.parquet:0..107], 
[tmp/test/1.parquet:107..214], [tmp/test/2.parquet:107..214], 
[tmp/test/1.parquet:214..321], ...]}, projection=[column1, column2], 
output_ordering=[column2@1 ASC NULLS LAST], file_type=parquet |
   |   |

 

Re: [I] Analysis to support`SortPreservingMerge` --> `ProgressiveEval` [datafusion]

2025-03-13 Thread via GitHub


suremarc commented on issue #15191:
URL: https://github.com/apache/datafusion/issues/15191#issuecomment-2722432514

   Thanks for the writeup. The core idea makes sense, but I have a couple of 
comments on the design, in particular I think we can do better in a few ways. 
   
   # 1. Some but not all partitions overlap
   
   As I understand it, the current implementation gives up if any two 
partitions are overlapping. This is fine if your target use case involves 
queries with no overlapping partitions, but we can do better. 
   Basically, if we take the proposed algorithm and add another step at the end 
where we apply "first fit", instead of producing only one lexical ordering or 
failing fast, we can produce multiple groups of "chained" input partitions, 
each ordered lexically by the sort key within themselves, and we can optimize 
the query to use the minimum number of chains possible. 
   
   Let me give an example:
   ```sql
   SELECT * FROM recent_table_1
   WHERE time > now() - INTERVAL 1 DAY
   UNION ALL
   SELECT * FROM recent_table_2
   WHERE time > now() - INTERVAL 1 DAY
   UNION ALL
   SELECT * FROM historic_table
   WHERE time < now() - INTERVAL 1 DAY
   ORDER BY time ASC
   ```
   
   Incidentally this is the use case I am targeting. Anyway, this query would 
result in at least 3 partitions, two of which are overlapping in `time`. If we 
could generalize `ProgressiveEval` to have multiple partition groupings, we 
could do something like this:
   ```sql
   SortPreservingMergeExec: time ASC
 ProgressiveEval: partitions=[2, 0], [1]
   UnionExec: partitions=[0, 1, 2]
 TableExec: recent_table_1
 TableExec: recent_table_2
 TableExec: historic_table
   ```
   
   This would concatenate partitions 2 and 0, and partition 1 remains 
unchanged. Then a final `SortPreservingMergeExec` is required to merge these 
into one sorted stream. 
   
   The "first fit" algorithm has actually already been implemented in 
[`FileScanConfig::split_groups_by_statistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_scan_config.rs#L569)
 which uses the 
[`MinMaxStatistics`](https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/statistics.rs#L64)
 helper to analyze min/maxes, however it was written to be used in 
`ParquetExec` (now deprecated and replaced with `DataSourceExec` I believe). 
   
   I believe this change could be retrofitted onto `ProgressiveEval` with not 
too much additional complexity. The analysis is already implemented, we just 
need to use it. Or we can take the implementation from influxdb and just add 
the final "first fit" step. 
   
   # 2. Scanning non-overlapping Parquet files
   
   I see that one of the target use cases for `ProgressiveEval` is #6672. I am 
a little curious to see the implementation, because the way I see it, 
`ProgressiveEval` will solve some but not all instances of this problem.
   
   Here is the issue I am worried about. When you use `ListingTable` you have a 
config option called `target_partitions`. By default it is set to the number of 
available cpu cores on the system. If the number of files exceeds 
`target_partitions` then it will start to merge files into the same partitions 
with no guarantees on ordering. Let me demonstrate using `datafusion-cli`:
   
   ```sql
   > SET datafusion.execution.target_partitions=2;
   0 row(s) fetched. 
   Elapsed 0.000 seconds.
   
   > CREATE EXTERNAL TABLE t1 (id INT, date DATE) STORED AS PARQUET LOCATION 
'./data/' PARTITIONED BY (date) WITH ORDER (id ASC);
   0 row(s) fetched. 
   Elapsed 0.002 seconds.
   
   > INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, 
'2025-03-03'), (1, '2025-03-04');
   +---+
   | count |
   +---+
   | 4 |
   +---+
   1 row(s) fetched. 
   Elapsed 0.004 seconds.
   
   > EXPLAIN SELECT * FROM t1 ORDER BY id ASC;
   
+---++
   | plan_type | plan   


|
   
+---++
   | logical_plan  | Sort: t1.id ASC NULLS LAST