gene-bordegaray opened a new issue, #19090:
URL: https://github.com/apache/datafusion/issues/19090

   ### Is your feature request related to a problem or challenge?
   
   When data is pre-partitioned by a user (in a hive-style), Datafusion should 
have the ability to preserve this partitioning to avoid unnecessary 
repartitions. Take this scenario for example, you have data partitioned by 
`f_dkey` and ordered by `(f_dkey, timestamp)`, which is hive-style partitioned:
   
   ```
   f_dkey=A/data.parquet
   f_dkey=B/data.parquet'
   f_dkey=C/data.parquet'
   ...
   
   Each table (partitioned by f_dkey and sorted on timestamp):
   | f_dkey | timestamp              | value  |
   |--------|------------------------|--------|
   | A      | 2023-01-01T09:00:00    | 95.5   |
   | A      | 2023-01-01T09:00:10    | 102.3  |
   | A      | 2023-01-01T09:00:20    | 98.7   |
   | A      | 2023-01-01T09:12:20    | 105.1  |
   | A      | 2023-01-01T09:12:30    | 100.0  |
   | A      | 2023-01-01T09:12:40    | 150.0  |
   | A      | 2023-01-01T09:12:50    | 120.8  |
   ```
   
   Runnuing the query:
   ```sql
   EXPLAIN SELECT f_dkey, count(*), avg(value) 
   FROM fact_table_ordered 
   GROUP BY f_dkey 
   ORDER BY f_dkey;
   ```
   would produce the plan:
   ```text
   01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
   02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as 
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
   03)----AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
   04)------SortExec: expr=[f_dkey@0 ASC NULLS LAST], 
preserve_partitioning=[true]
   05)--------CoalesceBatchesExec: target_batch_size=8192
   06)----------RepartitionExec: partitioning=Hash([f_dkey@0], 3), 
input_partitions=3
   07)------------AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
   08)--------------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
 projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], 
file_type=parquet
   ```
   
   Because our data is ordered on `(f_dkey, timestamp)`, when we hash 
repartition by `f_dkey` we lose our sort ordering thus forcing a `SortExec` to 
be inserted after the repartition. You could set 
`datafusion.optimizer.prefer_existing_sort = true;` to preserve the ordering 
through the repartition and thus preserve the ordering, but with the tradeoff 
of a more expensive shuffle.
   
   Since our data is partitioned by `f_dkey` at file scan time we can eliminate 
both the hash repartitioning, the eliminating the `SortExec` in the process. 
This would result in a plan that looks like:
   ```text
   01)SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST]
   02)--ProjectionExec: expr=[f_dkey@0 as f_dkey, count(Int64(1))@1 as 
count(*), avg(fact_table_ordered.value)@2 as avg(fact_table_ordered.value)]
   03)----AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey], 
aggr=[count(Int64(1)), avg(fact_table_ordered.value)], ordering_mode=Sorted
   04)------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]},
 projection=[value, f_dkey], output_ordering=[f_dkey@1 ASC NULLS LAST], 
file_type=parquet
   ```
   
   ### Describe the solution you'd like
   
   When a user has partitioned data, introduce the ability to take advantage of 
this by setting a new option, `preserve_file_partitioning`, to `true` (`false` 
by default). When this is set, Datafusion will preserve partitioning at the 
file scan level by creating file groups via maintaining file partitioning and 
and representin this by returning `Hash` for its `output_partitioning`.
   
   In turn, operators following this will no longer need to insert hash 
repartitions on our partition key.
   
   ### Describe alternatives you've considered
   
   I explored adding a new partitioning type: `KeyPartitioned` which would be 
used when data is explicitly partitioned by certain columns at the data source 
level. I explore this idea due to concerns about distinguishing between 
existing hash partitioning and this new partitioning type.
   
   After speaking with @gabotechs @fmonjalet and @NGA-TRAN, it was decided that 
the existing hash semantics satisfied what is being represented through the 
file partitioning. They both are promising that particular values of the 
expression being partitioned by are contained within the same partition.
   
   Furthermore, introducing a new partitioning type required many more rippling 
changes to properly propagate, apply rules to, and handle a new partitioning 
type. Extending hash functionality to the file scan level takes care of almost 
all this work.
   
   ### Additional context
   
   There is some follow up work that that should be discussed and considered:
   - **Superset Partitioning:** currently, `Hash(a)` doesn't satisfy `Hash(a, 
b)` although it should. This is because `Hash(a)` guarantees that all of `a` is 
contained in a single partition. Thus, since `Hash(a, b)` is a subset of 
`Hash(a)`, anything that is `Hash(a)` is also `Hash(a, b)`.
   - **Reduce File I/O with Preserve File Partitioning:** In the current 
implementation, when a partition value has many files all of this file I/O will 
go to one task. This is a tradeoff that increases I/O overhead to eliminate 
shuffle and sort overhead. There could be ways to increase I/O while still 
maintaining partitioning.
   - **Sort Satisfaction for Monotonic Functions:** If we are sorted by 
`timestamp` and then try to order by `date_bin('1 hour', timestamp)`, 
Datafusion will not recognize that this is implicitly satisfied. Thus, for 
monotonic functions: `date_bin`, `CAST`, `FLOOR`, etc. we should maintain 
ordering, eliminating unnecessary sorts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to