alamb commented on code in PR #9593: URL: https://github.com/apache/datafusion/pull/9593#discussion_r1585506338
########## datafusion/core/src/datasource/listing/mod.rs: ########## @@ -67,6 +67,11 @@ pub struct PartitionedFile { pub partition_values: Vec<ScalarValue>, /// An optional file range for a more fine-grained parallel execution pub range: Option<FileRange>, + /// Optional statistics that describe the data in this file if known. + /// + /// DataFusion relies on these statistics for planning (in particular to sort file groups), + /// so if they are incorrect, incorrect answers may result. + pub statistics: Option<Statistics>, Review Comment: this is actually a nice API to potentially provide pre-known statistics 👍 ########## datafusion/core/src/datasource/physical_plan/mod.rs: ########## @@ -473,15 +468,281 @@ fn get_projected_output_ordering( // since rest of the orderings are violated break; } + // do not push empty entries // otherwise we may have `Some(vec![])` at the output ordering. - if !new_ordering.is_empty() { - all_orderings.push(new_ordering); + if new_ordering.is_empty() { + continue; + } + + // Check if any file groups are not sorted + if base_config.file_groups.iter().any(|group| { + if group.len() <= 1 { + // File groups with <= 1 files are always sorted + return false; + } + + let statistics = match MinMaxStatistics::new_from_files( + &new_ordering, + projected_schema, + base_config.projection.as_deref(), + group, + ) { + Ok(statistics) => statistics, + Err(e) => { + log::trace!("Error fetching statistics for file group: {e}"); + // we can't prove that it's ordered, so we have to reject it + return true; + } + }; + + !statistics.is_sorted() + }) { + debug!( + "Skipping specified output ordering {:?}. \ + Some file groups couldn't be determined to be sorted: {:?}", + base_config.output_ordering[0], base_config.file_groups + ); + continue; } + + all_orderings.push(new_ordering); } all_orderings } +/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. Review Comment: Could you please put this structure into its own module (e.g. datafusion/core/src/datasource/physical_plan/statistics.rs) so that it is easier to find ########## datafusion/core/src/datasource/physical_plan/file_scan_config.rs: ########## @@ -770,6 +843,277 @@ mod tests { assert_eq!(projection.fields(), schema.fields()); } + #[test] + fn test_split_groups_by_statistics() -> Result<()> { + use chrono::TimeZone; + use datafusion_common::DFSchema; + use datafusion_expr::execution_props::ExecutionProps; + use object_store::{path::Path, ObjectMeta}; + + struct File { + name: &'static str, + date: &'static str, + statistics: Vec<Option<(f64, f64)>>, + } + impl File { + fn new( + name: &'static str, + date: &'static str, + statistics: Vec<Option<(f64, f64)>>, + ) -> Self { + Self { + name, + date, + statistics, + } + } + } + + struct TestCase { + name: &'static str, + file_schema: Schema, + files: Vec<File>, + sort: Vec<datafusion_expr::Expr>, + expected_result: Result<Vec<Vec<&'static str>>, &'static str>, + } + + use datafusion_expr::col; + let cases = vec![ + TestCase { + name: "test sort", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]), + }, + // same input but file '2' is in the middle + // test that we still order correctly + TestCase { + name: "test sort with files ordered differently", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]), + }, + TestCase { + name: "reverse sort", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(false, true)], + expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]), + }, + // reject nullable sort columns + TestCase { + name: "no nullable sort columns", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + true, // should fail because nullable + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column") + }, + TestCase { + name: "all three non-overlapping", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), Review Comment: I think all these tests also always have the first file with the minimum stastistics value -- can you possibly also test what happens when it is not (aka add a test that runs this test with file ids 2, 1, 0)? ########## datafusion/core/src/datasource/physical_plan/mod.rs: ########## @@ -473,15 +468,281 @@ fn get_projected_output_ordering( // since rest of the orderings are violated break; } + // do not push empty entries // otherwise we may have `Some(vec![])` at the output ordering. - if !new_ordering.is_empty() { - all_orderings.push(new_ordering); + if new_ordering.is_empty() { + continue; + } + + // Check if any file groups are not sorted + if base_config.file_groups.iter().any(|group| { + if group.len() <= 1 { + // File groups with <= 1 files are always sorted + return false; + } + + let statistics = match MinMaxStatistics::new_from_files( + &new_ordering, + projected_schema, + base_config.projection.as_deref(), + group, + ) { + Ok(statistics) => statistics, + Err(e) => { + log::trace!("Error fetching statistics for file group: {e}"); + // we can't prove that it's ordered, so we have to reject it + return true; + } + }; + + !statistics.is_sorted() + }) { + debug!( + "Skipping specified output ordering {:?}. \ + Some file groups couldn't be determined to be sorted: {:?}", + base_config.output_ordering[0], base_config.file_groups + ); + continue; } + + all_orderings.push(new_ordering); } all_orderings } +/// A normalized representation of file min/max statistics that allows for efficient sorting & comparison. +/// The min/max values are ordered by [`Self::sort_order`]. +/// Furthermore, any columns that are reversed in the sort order have their min/max values swapped. +pub(crate) struct MinMaxStatistics { + min_by_sort_order: arrow::row::Rows, + max_by_sort_order: arrow::row::Rows, + sort_order: Vec<PhysicalSortExpr>, +} + +impl MinMaxStatistics { + #[allow(unused)] + fn sort_order(&self) -> &[PhysicalSortExpr] { + &self.sort_order + } + + fn new_from_files<'a>( + projected_sort_order: &[PhysicalSortExpr], // Sort order with respect to projected schema + projected_schema: &SchemaRef, // Projected schema + projection: Option<&[usize]>, // Indices of projection in full table schema (None = all columns) Review Comment: I didn't see any tests that covered a non `None` projection and I am a little confused about how it could be correct if the projection was in terms of another schema 🤔 ########## datafusion/core/src/datasource/physical_plan/file_scan_config.rs: ########## @@ -770,6 +843,277 @@ mod tests { assert_eq!(projection.fields(), schema.fields()); } + #[test] + fn test_split_groups_by_statistics() -> Result<()> { + use chrono::TimeZone; + use datafusion_common::DFSchema; + use datafusion_expr::execution_props::ExecutionProps; + use object_store::{path::Path, ObjectMeta}; + + struct File { + name: &'static str, + date: &'static str, + statistics: Vec<Option<(f64, f64)>>, + } + impl File { + fn new( + name: &'static str, + date: &'static str, + statistics: Vec<Option<(f64, f64)>>, + ) -> Self { + Self { + name, + date, + statistics, + } + } + } + + struct TestCase { + name: &'static str, + file_schema: Schema, + files: Vec<File>, + sort: Vec<datafusion_expr::Expr>, + expected_result: Result<Vec<Vec<&'static str>>, &'static str>, + } + + use datafusion_expr::col; + let cases = vec![ + TestCase { + name: "test sort", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]), + }, + // same input but file '2' is in the middle + // test that we still order correctly + TestCase { + name: "test sort with files ordered differently", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]), + }, + TestCase { + name: "reverse sort", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(false, true)], + expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]), + }, + // reject nullable sort columns + TestCase { + name: "no nullable sort columns", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + true, // should fail because nullable + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]), + File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Err("construct min/max statistics for split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate sorting columns\ncaused by\nError during planning: cannot sort by nullable column") + }, + TestCase { + name: "all three non-overlapping", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]), + File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0", "1", "2"]]), + }, + TestCase { + name: "all three overlapping", + file_schema: Schema::new(vec![Field::new( + "value".to_string(), + DataType::Float64, + false, + )]), + files: vec![ + File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]), + File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]), + ], + sort: vec![col("value").sort(true, false)], + expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]), + }, + TestCase { Review Comment: can we please add a test for a single input file too? ########## datafusion/core/src/datasource/physical_plan/file_scan_config.rs: ########## @@ -194,6 +196,77 @@ impl FileScanConfig { .with_repartition_file_min_size(repartition_file_min_size) .repartition_file_groups(&file_groups) } + + /// Attempts to do a bin-packing on files into file groups, such that any two files + /// in a file group are ordered and non-overlapping with respect to their statistics. + /// It will produce the smallest number of file groups possible. + pub fn split_groups_by_statistics( + table_schema: &SchemaRef, + file_groups: &[Vec<PartitionedFile>], + sort_order: &[PhysicalSortExpr], + ) -> Result<Vec<Vec<PartitionedFile>>> { + let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>(); + // First Fit: + // * Choose the first file group that a file can be placed into. + // * If it fits into no existing file groups, create a new one. + // + // By sorting files by min values and then applying first-fit bin packing, + // we can produce the smallest number of file groups such that + // files within a group are in order and non-overlapping. + // + // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8 + // https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html + + if flattened_files.is_empty() { + return Ok(vec![]); + } + + let statistics = MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + ) + .map_err(|e| { + e.context("construct min/max statistics for split_groups_by_statistics") + })?; + + let indices_sorted_by_min = { Review Comment: I wonder if we could move this into `statistics itself` somehing like ```rust let indices_sorted_by_min = statistics.indices_sorted_by_min() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org