xudong963 commented on code in PR #15473:
URL: https://github.com/apache/datafusion/pull/15473#discussion_r2020883570
##########
datafusion/datasource/src/file_scan_config.rs:
##########
@@ -575,6 +575,95 @@ impl FileScanConfig {
})
}
+ /// Splits file groups into new groups based on statistics to enable
efficient parallel processing.
+ ///
+ /// The method distributes files across a target number of partitions
while ensuring
+ /// files within each partition maintain sort order based on their min/max
statistics.
+ ///
+ /// The algorithm works by:
+ /// 1. Sorting all files by their minimum values
+ /// 2. Trying to place each file into an existing group where it can
maintain sort order
+ /// 3. Creating new groups when necessary if a file cannot fit into
existing groups
+ /// 4. Prioritizing smaller groups when multiple suitable groups exist
(for load balancing)
+ ///
+ /// # Parameters
+ /// * `table_schema`: Schema containing information about the columns
+ /// * `file_groups`: The original file groups to split
+ /// * `sort_order`: The lexicographical ordering to maintain within each
group
+ /// * `target_partitions`: The desired number of output partitions
+ ///
+ /// # Returns
+ /// A new set of file groups, where files within each group are
non-overlapping with respect to
+ /// their min/max statistics and maintain the specified sort order.
+ pub fn split_groups_by_statistics_v2(
+ table_schema: &SchemaRef,
+ file_groups: &[FileGroup],
+ sort_order: &LexOrdering,
+ target_partitions: usize,
+ ) -> Result<Vec<FileGroup>> {
+ let flattened_files = file_groups
+ .iter()
+ .flat_map(FileGroup::iter)
+ .collect::<Vec<_>>();
+
+ if flattened_files.is_empty() {
+ return Ok(vec![]);
+ }
+
+ let statistics = MinMaxStatistics::new_from_files(
+ sort_order,
+ table_schema,
+ None,
+ flattened_files.iter().copied(),
+ )?;
+
+ let indices_sorted_by_min = statistics.min_values_sorted();
+
+ // Initialize with target_partitions empty groups
+ let mut file_groups_indices: Vec<Vec<usize>> =
+ vec![vec![]; target_partitions.max(1)];
+
+ for (idx, min) in indices_sorted_by_min {
+ // Find all groups where the file can fit
+ let mut suitable_groups: Vec<(usize, &mut Vec<usize>)> =
file_groups_indices
+ .iter_mut()
+ .enumerate()
+ .filter(|(_, group)| {
+ group.is_empty()
+ || min
+ > statistics
+ .max(*group.last().expect("groups should not
be empty"))
+ })
+ .collect();
+
+ // Sort by group size to prioritize smaller groups
+ suitable_groups.sort_by_key(|(_, group)| group.len());
Review Comment:
Nice, good to know the API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]