This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7c1c7941a0 feat: Determine ordering of file groups (#9593)
7c1c7941a0 is described below
commit 7c1c7941a078153173a6c82bf2c8742bbcbdefa8
Author: Matthew Cramerus <[email protected]>
AuthorDate: Wed May 1 15:37:43 2024 -0500
feat: Determine ordering of file groups (#9593)
* add statistics to PartitionedFile
* just dump work for now
* working test case
* fix jumbled rebase
* forgot to annotate #[test]
* more refactoring
* add a link
* refactor again
* whitespace
* format debug log
* remove useless itertools
* refactor test
* fix bug
* use sort_file_groups in ListingTable
* move check into a better place
* refactor test a bit
* more testing
* more testing
* better error message
* fix log msg
* fix again
* add sqllogictest and fixes
* fix test
* Update datafusion/core/src/datasource/listing/mod.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/core/src/datasource/physical_plan/file_scan_config.rs
Co-authored-by: Andrew Lamb <[email protected]>
* more unit tests
* rename to split_groups_by_statistics
* only use groups if there's <= target_partitions
* refactor a bit, no need for projected_schema
* fix reverse order
* save work for now
* lots of test cases in new slt
* remove output check
* fix
* fix last test
* comment on params
* clippy
* revert parquet.slt
* no need to pass projection separately
* Update datafusion/core/src/datasource/listing/mod.rs
Co-authored-by: Nga Tran <[email protected]>
* update comment on in
* fix test?
* un-fix?
* add fix back in?
* move indices_sorted_by_min to MinMaxStatistics
* move MinMaxStatistics to its own module
* fix license
* add feature flag
* update config
---------
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Nga Tran <[email protected]>
---
datafusion/common/src/config.rs | 5 +
datafusion/core/src/datasource/file_format/mod.rs | 1 +
datafusion/core/src/datasource/listing/helpers.rs | 1 +
datafusion/core/src/datasource/listing/mod.rs | 12 +-
datafusion/core/src/datasource/listing/table.rs | 39 ++-
.../datasource/physical_plan/file_scan_config.rs | 350 ++++++++++++++++++++-
.../core/src/datasource/physical_plan/mod.rs | 45 ++-
.../src/datasource/physical_plan/parquet/mod.rs | 3 +
.../src/datasource/physical_plan/statistics.rs | 290 +++++++++++++++++
datafusion/core/src/test_util/parquet.rs | 1 +
datafusion/core/tests/parquet/custom_reader.rs | 1 +
datafusion/core/tests/parquet/page_pruning.rs | 1 +
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 17 +
datafusion/proto/src/generated/prost.rs | 2 +
datafusion/proto/src/physical_plan/from_proto.rs | 1 +
datafusion/proto/src/physical_plan/to_proto.rs | 1 +
.../sqllogictest/test_files/information_schema.slt | 2 +
.../test_files/parquet_sorted_statistics.slt | 262 +++++++++++++++
datafusion/substrait/src/physical_plan/consumer.rs | 1 +
docs/source/user-guide/configs.md | 1 +
21 files changed, 1018 insertions(+), 19 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 9ba66dd609..bcaaa68a34 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -297,6 +297,11 @@ config_namespace! {
/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true
+
+ /// Attempt to eliminate sorts by packing & sorting files with
non-overlapping
+ /// statistics into the same file groups.
+ /// Currently experimental
+ pub split_file_groups_by_statistics: bool, default = false
}
}
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 5ee0f71867..fdb89a2649 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -150,6 +150,7 @@ pub(crate) mod test_util {
object_meta: meta,
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
}]];
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 372f61b1e6..09d9aa8811 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -373,6 +373,7 @@ pub async fn pruned_partition_list<'a>(
object_meta,
partition_values: partition_values.clone(),
range: None,
+ statistics: None,
extensions: None,
})
}));
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index b8c279c8a7..d0361d7b32 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -24,7 +24,7 @@ mod url;
use crate::error::Result;
use chrono::TimeZone;
-use datafusion_common::ScalarValue;
+use datafusion_common::{ScalarValue, Statistics};
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
@@ -67,6 +67,11 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
+ /// Optional statistics that describe the data in this file if known.
+ ///
+ /// DataFusion relies on these statistics for planning (in particular to
sort file groups),
+ /// so if they are incorrect, incorrect answers may result.
+ pub statistics: Option<Statistics>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
@@ -83,6 +88,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
}
}
@@ -98,7 +104,8 @@ impl PartitionedFile {
version: None,
},
partition_values: vec![],
- range: None,
+ range: Some(FileRange { start, end }),
+ statistics: None,
extensions: None,
}
.with_range(start, end)
@@ -128,6 +135,7 @@ impl From<ObjectMeta> for PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 6ee19828f1..4b1994a179 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -739,16 +739,43 @@ impl TableProvider for ListingTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let (partitioned_file_lists, statistics) =
+ let (mut partitioned_file_lists, statistics) =
self.list_files_for_scan(state, filters, limit).await?;
+ let projected_schema = project_schema(&self.schema(), projection)?;
+
// if no files need to be read, return an `EmptyExec`
if partitioned_file_lists.is_empty() {
- let schema = self.schema();
- let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}
+ let output_ordering = self.try_create_output_ordering()?;
+ match state
+ .config_options()
+ .execution
+ .split_file_groups_by_statistics
+ .then(|| {
+ output_ordering.first().map(|output_ordering| {
+ FileScanConfig::split_groups_by_statistics(
+ &self.table_schema,
+ &partitioned_file_lists,
+ output_ordering,
+ )
+ })
+ })
+ .flatten()
+ {
+ Some(Err(e)) => log::debug!("failed to split file groups by
statistics: {e}"),
+ Some(Ok(new_groups)) => {
+ if new_groups.len() <= self.options.target_partitions {
+ partitioned_file_lists = new_groups;
+ } else {
+ log::debug!("attempted to split file groups by statistics,
but there were more file groups than target_partitions; falling back to
unordered")
+ }
+ }
+ None => {} // no ordering required
+ };
+
// extract types of partition columns
let table_partition_cols = self
.options
@@ -772,6 +799,7 @@ impl TableProvider for ListingTable {
} else {
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
};
+
// create the execution plan
self.options
.format
@@ -784,7 +812,7 @@ impl TableProvider for ListingTable {
statistics,
projection: projection.cloned(),
limit,
- output_ordering: self.try_create_output_ordering()?,
+ output_ordering,
table_partition_cols,
},
filters.as_ref(),
@@ -937,10 +965,11 @@ impl ListingTable {
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
- let part_file = part_file?;
+ let mut part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store,
&part_file).await?;
+ part_file.statistics = Some(statistics.clone());
Ok((part_file, statistics)) as Result<(PartitionedFile,
Statistics)>
} else {
Ok((part_file, Statistics::new_unknown(&self.file_schema)))
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 1ea411cb6f..4de7eb136f 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -22,7 +22,9 @@ use std::{
borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData,
sync::Arc, vec,
};
-use super::{get_projected_output_ordering, FileGroupPartitioner};
+use super::{
+ get_projected_output_ordering, statistics::MinMaxStatistics,
FileGroupPartitioner,
+};
use crate::datasource::{listing::PartitionedFile,
object_store::ObjectStoreUrl};
use crate::{error::Result, scalar::ScalarValue};
@@ -33,7 +35,7 @@ use arrow_array::{ArrayRef, DictionaryArray, RecordBatch,
RecordBatchOptions};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError,
Statistics};
-use datafusion_physical_expr::LexOrdering;
+use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use log::warn;
@@ -138,12 +140,14 @@ impl FileScanConfig {
column_statistics: table_cols_stats,
};
- let table_schema = Arc::new(
+ let projected_schema = Arc::new(
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
);
+
let projected_output_ordering =
- get_projected_output_ordering(self, &table_schema);
- (table_schema, table_stats, projected_output_ordering)
+ get_projected_output_ordering(self, &projected_schema);
+
+ (projected_schema, table_stats, projected_output_ordering)
}
#[allow(unused)] // Only used by avro
@@ -194,6 +198,71 @@ impl FileScanConfig {
.with_repartition_file_min_size(repartition_file_min_size)
.repartition_file_groups(&file_groups)
}
+
+ /// Attempts to do a bin-packing on files into file groups, such that any
two files
+ /// in a file group are ordered and non-overlapping with respect to their
statistics.
+ /// It will produce the smallest number of file groups possible.
+ pub fn split_groups_by_statistics(
+ table_schema: &SchemaRef,
+ file_groups: &[Vec<PartitionedFile>],
+ sort_order: &[PhysicalSortExpr],
+ ) -> Result<Vec<Vec<PartitionedFile>>> {
+ let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
+ // First Fit:
+ // * Choose the first file group that a file can be placed into.
+ // * If it fits into no existing file groups, create a new one.
+ //
+ // By sorting files by min values and then applying first-fit bin
packing,
+ // we can produce the smallest number of file groups such that
+ // files within a group are in order and non-overlapping.
+ //
+ // Source: Applied Combinatorics (Keller and Trotter), Chapter 6.8
+ //
https://www.appliedcombinatorics.org/book/s_posets_dilworth-intord.html
+
+ if flattened_files.is_empty() {
+ return Ok(vec![]);
+ }
+
+ let statistics = MinMaxStatistics::new_from_files(
+ sort_order,
+ table_schema,
+ None,
+ flattened_files.iter().copied(),
+ )
+ .map_err(|e| {
+ e.context("construct min/max statistics for
split_groups_by_statistics")
+ })?;
+
+ let indices_sorted_by_min = statistics.min_values_sorted();
+ let mut file_groups_indices: Vec<Vec<usize>> = vec![];
+
+ for (idx, min) in indices_sorted_by_min {
+ let file_group_to_insert =
file_groups_indices.iter_mut().find(|group| {
+ // If our file is non-overlapping and comes _after_ the last
file,
+ // it fits in this file group.
+ min > statistics.max(
+ *group
+ .last()
+ .expect("groups should be nonempty at construction"),
+ )
+ });
+ match file_group_to_insert {
+ Some(group) => group.push(idx),
+ None => file_groups_indices.push(vec![idx]),
+ }
+ }
+
+ // Assemble indices back into groups of PartitionedFiles
+ Ok(file_groups_indices
+ .into_iter()
+ .map(|file_group_indices| {
+ file_group_indices
+ .into_iter()
+ .map(|idx| flattened_files[idx].clone())
+ .collect()
+ })
+ .collect())
+ }
}
/// A helper that projects partition columns into the file record batches.
@@ -770,6 +839,277 @@ mod tests {
assert_eq!(projection.fields(), schema.fields());
}
+ #[test]
+ fn test_split_groups_by_statistics() -> Result<()> {
+ use chrono::TimeZone;
+ use datafusion_common::DFSchema;
+ use datafusion_expr::execution_props::ExecutionProps;
+ use object_store::{path::Path, ObjectMeta};
+
+ struct File {
+ name: &'static str,
+ date: &'static str,
+ statistics: Vec<Option<(f64, f64)>>,
+ }
+ impl File {
+ fn new(
+ name: &'static str,
+ date: &'static str,
+ statistics: Vec<Option<(f64, f64)>>,
+ ) -> Self {
+ Self {
+ name,
+ date,
+ statistics,
+ }
+ }
+ }
+
+ struct TestCase {
+ name: &'static str,
+ file_schema: Schema,
+ files: Vec<File>,
+ sort: Vec<datafusion_expr::Expr>,
+ expected_result: Result<Vec<Vec<&'static str>>, &'static str>,
+ }
+
+ use datafusion_expr::col;
+ let cases = vec![
+ TestCase {
+ name: "test sort",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+ File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
+ },
+ // same input but file '2' is in the middle
+ // test that we still order correctly
+ TestCase {
+ name: "test sort with files ordered differently",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+ File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Ok(vec![vec!["0", "1"], vec!["2"]]),
+ },
+ TestCase {
+ name: "reverse sort",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+ File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+ ],
+ sort: vec![col("value").sort(false, true)],
+ expected_result: Ok(vec![vec!["1", "0"], vec!["2"]]),
+ },
+ // reject nullable sort columns
+ TestCase {
+ name: "no nullable sort columns",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ true, // should fail because nullable
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.50, 1.00))]),
+ File::new("2", "2023-01-02", vec![Some((0.00, 1.00))]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Err("construct min/max statistics for
split_groups_by_statistics\ncaused by\nbuild min rows\ncaused by\ncreate
sorting columns\ncaused by\nError during planning: cannot sort by nullable
column")
+ },
+ TestCase {
+ name: "all three non-overlapping",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.50, 0.99))]),
+ File::new("2", "2023-01-02", vec![Some((1.00, 1.49))]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Ok(vec![vec!["0", "1", "2"]]),
+ },
+ TestCase {
+ name: "all three overlapping",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("2", "2023-01-02", vec![Some((0.00, 0.49))]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Ok(vec![vec!["0"], vec!["1"], vec!["2"]]),
+ },
+ TestCase {
+ name: "empty input",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Ok(vec![]),
+ },
+ TestCase {
+ name: "one file missing statistics",
+ file_schema: Schema::new(vec![Field::new(
+ "value".to_string(),
+ DataType::Float64,
+ false,
+ )]),
+ files: vec![
+ File::new("0", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("1", "2023-01-01", vec![Some((0.00, 0.49))]),
+ File::new("2", "2023-01-02", vec![None]),
+ ],
+ sort: vec![col("value").sort(true, false)],
+ expected_result: Err("construct min/max statistics for
split_groups_by_statistics\ncaused by\ncollect min/max values\ncaused by\nget
min/max for column: 'value'\ncaused by\nError during planning: statistics not
found"),
+ },
+ ];
+
+ for case in cases {
+ let table_schema = Arc::new(Schema::new(
+ case.file_schema
+ .fields()
+ .clone()
+ .into_iter()
+ .cloned()
+ .chain(Some(Arc::new(Field::new(
+ "date".to_string(),
+ DataType::Utf8,
+ false,
+ ))))
+ .collect::<Vec<_>>(),
+ ));
+ let sort_order = case
+ .sort
+ .into_iter()
+ .map(|expr| {
+ crate::physical_planner::create_physical_sort_expr(
+ &expr,
+ &DFSchema::try_from(table_schema.as_ref().clone())?,
+ &ExecutionProps::default(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ let partitioned_files =
+ case.files.into_iter().map(From::from).collect::<Vec<_>>();
+ let result = FileScanConfig::split_groups_by_statistics(
+ &table_schema,
+ &[partitioned_files.clone()],
+ &sort_order,
+ );
+ let results_by_name = result
+ .as_ref()
+ .map(|file_groups| {
+ file_groups
+ .iter()
+ .map(|file_group| {
+ file_group
+ .iter()
+ .map(|file| {
+ partitioned_files
+ .iter()
+ .find_map(|f| {
+ if f.object_meta ==
file.object_meta {
+ Some(
+ f.object_meta
+ .location
+ .as_ref()
+ .rsplit('/')
+ .next()
+ .unwrap()
+
.trim_end_matches(".parquet"),
+ )
+ } else {
+ None
+ }
+ })
+ .unwrap()
+ })
+ .collect::<Vec<_>>()
+ })
+ .collect::<Vec<_>>()
+ })
+ .map_err(|e| e.to_string().leak() as &'static str);
+
+ assert_eq!(results_by_name, case.expected_result, "{}", case.name);
+ }
+
+ return Ok(());
+
+ impl From<File> for PartitionedFile {
+ fn from(file: File) -> Self {
+ PartitionedFile {
+ object_meta: ObjectMeta {
+ location: Path::from(format!(
+ "data/date={}/{}.parquet",
+ file.date, file.name
+ )),
+ last_modified: chrono::Utc.timestamp_nanos(0),
+ size: 0,
+ e_tag: None,
+ version: None,
+ },
+ partition_values: vec![ScalarValue::from(file.date)],
+ range: None,
+ statistics: Some(Statistics {
+ num_rows: Precision::Absent,
+ total_byte_size: Precision::Absent,
+ column_statistics: file
+ .statistics
+ .into_iter()
+ .map(|stats| {
+ stats
+ .map(|(min, max)| ColumnStatistics {
+ min_value:
Precision::Exact(ScalarValue::from(
+ min,
+ )),
+ max_value:
Precision::Exact(ScalarValue::from(
+ max,
+ )),
+ ..Default::default()
+ })
+ .unwrap_or_default()
+ })
+ .collect::<Vec<_>>(),
+ }),
+ extensions: None,
+ }
+ }
+ }
+ }
+
// sets default for configs that play no role in projections
fn config_for_projection(
file_schema: SchemaRef,
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index ddb8d032f3..c450774572 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -26,6 +26,7 @@ mod file_stream;
mod json;
#[cfg(feature = "parquet")]
pub mod parquet;
+mod statistics;
pub(crate) use self::csv::plan_to_csv;
pub(crate) use self::json::plan_to_json;
@@ -451,11 +452,6 @@ fn get_projected_output_ordering(
) -> Vec<Vec<PhysicalSortExpr>> {
let mut all_orderings = vec![];
for output_ordering in &base_config.output_ordering {
- if base_config.file_groups.iter().any(|group| group.len() > 1) {
- debug!("Skipping specified output ordering {:?}. Some file group
had more than one file: {:?}",
- base_config.output_ordering[0], base_config.file_groups);
- return vec![];
- }
let mut new_ordering = vec![];
for PhysicalSortExpr { expr, options } in output_ordering {
if let Some(col) = expr.as_any().downcast_ref::<Column>() {
@@ -473,11 +469,45 @@ fn get_projected_output_ordering(
// since rest of the orderings are violated
break;
}
+
// do not push empty entries
// otherwise we may have `Some(vec![])` at the output ordering.
- if !new_ordering.is_empty() {
- all_orderings.push(new_ordering);
+ if new_ordering.is_empty() {
+ continue;
+ }
+
+ // Check if any file groups are not sorted
+ if base_config.file_groups.iter().any(|group| {
+ if group.len() <= 1 {
+ // File groups with <= 1 files are always sorted
+ return false;
+ }
+
+ let statistics = match
statistics::MinMaxStatistics::new_from_files(
+ &new_ordering,
+ projected_schema,
+ base_config.projection.as_deref(),
+ group,
+ ) {
+ Ok(statistics) => statistics,
+ Err(e) => {
+ log::trace!("Error fetching statistics for file group:
{e}");
+ // we can't prove that it's ordered, so we have to reject
it
+ return true;
+ }
+ };
+
+ !statistics.is_sorted()
+ }) {
+ debug!(
+ "Skipping specified output ordering {:?}. \
+ Some file groups couldn't be determined to be sorted: {:?}",
+ base_config.output_ordering[0], base_config.file_groups
+ );
+ continue;
}
+
+ all_orderings.push(new_ordering);
}
all_orderings
}
@@ -861,6 +891,7 @@ mod tests {
object_meta,
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
}
}
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 73fb82980f..b286b0f746 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -1538,6 +1538,7 @@ mod tests {
object_meta: meta.clone(),
partition_values: vec![],
range: Some(FileRange { start, end }),
+ statistics: None,
extensions: None,
}
}
@@ -1639,6 +1640,7 @@ mod tests {
),
],
range: None,
+ statistics: None,
extensions: None,
};
@@ -1733,6 +1735,7 @@ mod tests {
},
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
};
diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs
b/datafusion/core/src/datasource/physical_plan/statistics.rs
new file mode 100644
index 0000000000..e1c61ec1a7
--- /dev/null
+++ b/datafusion/core/src/datasource/physical_plan/statistics.rs
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ *
+ * Use statistics to optimize physical planning.
+ *
+ * Currently, this module houses code to sort file groups if they are
non-overlapping with
+ * respect to the required sort order. See [`MinMaxStatistics`]
+ *
+*/
+
+use std::sync::Arc;
+
+use arrow::{
+ compute::SortColumn,
+ row::{Row, Rows},
+};
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
+
+use crate::datasource::listing::PartitionedFile;
+
+/// A normalized representation of file min/max statistics that allows for
efficient sorting & comparison.
+/// The min/max values are ordered by [`Self::sort_order`].
+/// Furthermore, any columns that are reversed in the sort order have their
min/max values swapped.
+pub(crate) struct MinMaxStatistics {
+ min_by_sort_order: Rows,
+ max_by_sort_order: Rows,
+ sort_order: Vec<PhysicalSortExpr>,
+}
+
+impl MinMaxStatistics {
+ /// Sort order used to sort the statistics
+ #[allow(unused)]
+ pub fn sort_order(&self) -> &[PhysicalSortExpr] {
+ &self.sort_order
+ }
+
+ /// Min value at index
+ #[allow(unused)]
+ pub fn min(&self, idx: usize) -> Row {
+ self.min_by_sort_order.row(idx)
+ }
+
+ /// Max value at index
+ pub fn max(&self, idx: usize) -> Row {
+ self.max_by_sort_order.row(idx)
+ }
+
+ pub fn new_from_files<'a>(
+ projected_sort_order: &[PhysicalSortExpr], // Sort order with respect
to projected schema
+ projected_schema: &SchemaRef, // Projected schema
+ projection: Option<&[usize]>, // Indices of projection in full table
schema (None = all columns)
+ files: impl IntoIterator<Item = &'a PartitionedFile>,
+ ) -> Result<Self> {
+ use datafusion_common::ScalarValue;
+
+ let statistics_and_partition_values = files
+ .into_iter()
+ .map(|file| {
+ file.statistics
+ .as_ref()
+ .zip(Some(file.partition_values.as_slice()))
+ })
+ .collect::<Option<Vec<_>>>()
+ .ok_or_else(|| {
+ DataFusionError::Plan("Parquet file missing
statistics".to_string())
+ })?;
+
+ // Helper function to get min/max statistics for a given column of
projected_schema
+ let get_min_max = |i: usize| -> Result<(Vec<ScalarValue>,
Vec<ScalarValue>)> {
+ Ok(statistics_and_partition_values
+ .iter()
+ .map(|(s, pv)| {
+ if i < s.column_statistics.len() {
+ s.column_statistics[i]
+ .min_value
+ .get_value()
+ .cloned()
+
.zip(s.column_statistics[i].max_value.get_value().cloned())
+ .ok_or_else(|| {
+ DataFusionError::Plan("statistics not
found".to_string())
+ })
+ } else {
+ let partition_value = &pv[i -
s.column_statistics.len()];
+ Ok((partition_value.clone(), partition_value.clone()))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .unzip())
+ };
+
+ let sort_columns =
sort_columns_from_physical_sort_exprs(projected_sort_order)
+ .ok_or(DataFusionError::Plan(
+ "sort expression must be on column".to_string(),
+ ))?;
+
+ // Project the schema & sort order down to just the relevant columns
+ let min_max_schema = Arc::new(
+ projected_schema
+ .project(&(sort_columns.iter().map(|c|
c.index()).collect::<Vec<_>>()))?,
+ );
+ let min_max_sort_order = sort_columns
+ .iter()
+ .zip(projected_sort_order.iter())
+ .enumerate()
+ .map(|(i, (col, sort))| PhysicalSortExpr {
+ expr: Arc::new(Column::new(col.name(), i)),
+ options: sort.options,
+ })
+ .collect::<Vec<_>>();
+
+ let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
+ .iter()
+ .map(|c| {
+ // Reverse the projection to get the index of the column in
the full statistics
+ // The file statistics contains _every_ column , but the sort
column's index()
+ // refers to the index in projected_schema
+ let i = projection.map(|p| p[c.index()]).unwrap_or(c.index());
+
+ let (min, max) = get_min_max(i).map_err(|e| {
+ e.context(format!("get min/max for column: '{}'",
c.name()))
+ })?;
+ Ok((
+ ScalarValue::iter_to_array(min)?,
+ ScalarValue::iter_to_array(max)?,
+ ))
+ })
+ .collect::<Result<Vec<_>>>()
+ .map_err(|e| e.context("collect min/max values"))?
+ .into_iter()
+ .unzip();
+
+ Self::new(
+ &min_max_sort_order,
+ &min_max_schema,
+ RecordBatch::try_new(Arc::clone(&min_max_schema),
min_values).map_err(
+ |e| {
+ DataFusionError::ArrowError(e, Some("\ncreate min
batch".to_string()))
+ },
+ )?,
+ RecordBatch::try_new(Arc::clone(&min_max_schema),
max_values).map_err(
+ |e| {
+ DataFusionError::ArrowError(e, Some("\ncreate max
batch".to_string()))
+ },
+ )?,
+ )
+ }
+
+ pub fn new(
+ sort_order: &[PhysicalSortExpr],
+ schema: &SchemaRef,
+ min_values: RecordBatch,
+ max_values: RecordBatch,
+ ) -> Result<Self> {
+ use arrow::row::*;
+
+ let sort_fields = sort_order
+ .iter()
+ .map(|expr| {
+ expr.expr
+ .data_type(schema)
+ .map(|data_type| SortField::new_with_options(data_type,
expr.options))
+ })
+ .collect::<Result<Vec<_>>>()
+ .map_err(|e| e.context("create sort fields"))?;
+ let converter = RowConverter::new(sort_fields)?;
+
+ let sort_columns =
sort_columns_from_physical_sort_exprs(sort_order).ok_or(
+ DataFusionError::Plan("sort expression must be on
column".to_string()),
+ )?;
+
+ // swap min/max if they're reversed in the ordering
+ let (new_min_cols, new_max_cols): (Vec<_>, Vec<_>) = sort_order
+ .iter()
+ .zip(sort_columns.iter().copied())
+ .map(|(sort_expr, column)| {
+ if sort_expr.options.descending {
+ max_values
+ .column_by_name(column.name())
+ .zip(min_values.column_by_name(column.name()))
+ } else {
+ min_values
+ .column_by_name(column.name())
+ .zip(max_values.column_by_name(column.name()))
+ }
+ .ok_or_else(|| {
+ DataFusionError::Plan(format!(
+ "missing column in MinMaxStatistics::new: '{}'",
+ column.name()
+ ))
+ })
+ })
+ .collect::<Result<Vec<_>>>()?
+ .into_iter()
+ .unzip();
+
+ let [min, max] = [new_min_cols, new_max_cols].map(|cols| {
+ let values = RecordBatch::try_new(
+ min_values.schema(),
+ cols.into_iter().cloned().collect(),
+ )?;
+ let sorting_columns = sort_order
+ .iter()
+ .zip(sort_columns.iter().copied())
+ .map(|(sort_expr, column)| {
+ let schema = values.schema();
+
+ let idx = schema.index_of(column.name())?;
+ let field = schema.field(idx);
+
+ // check that sort columns are non-nullable
+ if field.is_nullable() {
+ return Err(DataFusionError::Plan(
+ "cannot sort by nullable column".to_string(),
+ ));
+ }
+
+ Ok(SortColumn {
+ values: Arc::clone(values.column(idx)),
+ options: Some(sort_expr.options),
+ })
+ })
+ .collect::<Result<Vec<_>>>()
+ .map_err(|e| e.context("create sorting columns"))?;
+ converter
+ .convert_columns(
+ &sorting_columns
+ .into_iter()
+ .map(|c| c.values)
+ .collect::<Vec<_>>(),
+ )
+ .map_err(|e| {
+ DataFusionError::ArrowError(e, Some("convert
columns".to_string()))
+ })
+ });
+
+ Ok(Self {
+ min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
+ max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
+ sort_order: sort_order.to_vec(),
+ })
+ }
+
+ /// Return a sorted list of the min statistics together with the original
indices
+ pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
+ let mut sort: Vec<_> =
self.min_by_sort_order.iter().enumerate().collect();
+ sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
+ sort
+ }
+
+ /// Check if the min/max statistics are in order and non-overlapping
+ pub fn is_sorted(&self) -> bool {
+ self.max_by_sort_order
+ .iter()
+ .zip(self.min_by_sort_order.iter().skip(1))
+ .all(|(max, next_min)| max < next_min)
+ }
+}
+
+fn sort_columns_from_physical_sort_exprs(
+ sort_order: &[PhysicalSortExpr],
+) -> Option<Vec<&datafusion_physical_plan::expressions::Column>> {
+ sort_order
+ .iter()
+ .map(|expr| {
+ expr.expr
+ .as_any()
+
.downcast_ref::<datafusion_physical_expr::expressions::Column>()
+ })
+ .collect::<Option<Vec<_>>>()
+}
diff --git a/datafusion/core/src/test_util/parquet.rs
b/datafusion/core/src/test_util/parquet.rs
index 8113d799a1..f949058769 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -151,6 +151,7 @@ impl TestParquetFile {
object_meta: self.object_meta.clone(),
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
}]],
statistics: Statistics::new_unknown(&self.schema),
diff --git a/datafusion/core/tests/parquet/custom_reader.rs
b/datafusion/core/tests/parquet/custom_reader.rs
index 4bacc80579..e4f4d229c4 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -69,6 +69,7 @@ async fn
route_data_access_ops_to_parquet_file_reader_factory() {
object_meta: meta,
partition_values: vec![],
range: None,
+ statistics: None,
extensions:
Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
})
.collect();
diff --git a/datafusion/core/tests/parquet/page_pruning.rs
b/datafusion/core/tests/parquet/page_pruning.rs
index ccaa65b7ee..8f42f21834 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -62,6 +62,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr)
-> ParquetExec {
object_meta: meta,
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
};
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index c2653fa96f..315c1449b1 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1692,6 +1692,7 @@ message PartitionedFile {
uint64 last_modified_ns = 3;
repeated ScalarValue partition_values = 4;
FileRange range = 5;
+ Statistics statistics = 6;
}
message FileRange {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 0fb6f46237..e28ac6b835 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -17451,6 +17451,9 @@ impl serde::Serialize for PartitionedFile {
if self.range.is_some() {
len += 1;
}
+ if self.statistics.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion.PartitionedFile", len)?;
if !self.path.is_empty() {
struct_ser.serialize_field("path", &self.path)?;
@@ -17469,6 +17472,9 @@ impl serde::Serialize for PartitionedFile {
if let Some(v) = self.range.as_ref() {
struct_ser.serialize_field("range", v)?;
}
+ if let Some(v) = self.statistics.as_ref() {
+ struct_ser.serialize_field("statistics", v)?;
+ }
struct_ser.end()
}
}
@@ -17486,6 +17492,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
"partition_values",
"partitionValues",
"range",
+ "statistics",
];
#[allow(clippy::enum_variant_names)]
@@ -17495,6 +17502,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
LastModifiedNs,
PartitionValues,
Range,
+ Statistics,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -17521,6 +17529,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
"lastModifiedNs" | "last_modified_ns" =>
Ok(GeneratedField::LastModifiedNs),
"partitionValues" | "partition_values" =>
Ok(GeneratedField::PartitionValues),
"range" => Ok(GeneratedField::Range),
+ "statistics" => Ok(GeneratedField::Statistics),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -17545,6 +17554,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
let mut last_modified_ns__ = None;
let mut partition_values__ = None;
let mut range__ = None;
+ let mut statistics__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Path => {
@@ -17581,6 +17591,12 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
}
range__ = map_.next_value()?;
}
+ GeneratedField::Statistics => {
+ if statistics__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("statistics"));
+ }
+ statistics__ = map_.next_value()?;
+ }
}
}
Ok(PartitionedFile {
@@ -17589,6 +17605,7 @@ impl<'de> serde::Deserialize<'de> for PartitionedFile {
last_modified_ns: last_modified_ns__.unwrap_or_default(),
partition_values: partition_values__.unwrap_or_default(),
range: range__,
+ statistics: statistics__,
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index a7aa73d1b6..c327b3ad84 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2694,6 +2694,8 @@ pub struct PartitionedFile {
pub partition_values: ::prost::alloc::vec::Vec<ScalarValue>,
#[prost(message, optional, tag = "5")]
pub range: ::core::option::Option<FileRange>,
+ #[prost(message, optional, tag = "6")]
+ pub statistics: ::core::option::Option<Statistics>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index e9728d8542..1fcdafa530 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -535,6 +535,7 @@ impl TryFrom<&protobuf::PartitionedFile> for
PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: val.range.as_ref().map(|v| v.try_into()).transpose()?,
+ statistics: val.statistics.as_ref().map(|v|
v.try_into()).transpose()?,
extensions: None,
})
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index a6af5e0bb5..dde0821b0b 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -612,6 +612,7 @@ impl TryFrom<&PartitionedFile> for
protobuf::PartitionedFile {
.map(|v| v.try_into())
.collect::<Result<Vec<_>, _>>()?,
range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
+ statistics: pf.statistics.as_ref().map(|s| s.into()),
})
}
}
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index c64279ede0..e030015fa5 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -200,6 +200,7 @@ datafusion.execution.planning_concurrency 13
datafusion.execution.soft_max_rows_per_output_file 50000000
datafusion.execution.sort_in_place_threshold_bytes 1048576
datafusion.execution.sort_spill_reservation_bytes 10485760
+datafusion.execution.split_file_groups_by_statistics false
datafusion.execution.target_partitions 7
datafusion.execution.time_zone +00:00
datafusion.explain.logical_plan_only false
@@ -279,6 +280,7 @@ datafusion.execution.planning_concurrency 13 Fan-out during
initial physical pla
datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of
rows in output files when writing multiple. This is a soft max, so it can be
exceeded slightly. There also will be one file smaller than the limit if the
total number of rows written is not roughly divisible by the soft max
datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below
what size should data be concatenated and sorted in a single RecordBatch rather
than sorted in batches and merged.
datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the
reserved memory for each spillable sort operation to facilitate an in-memory
merge. When a sort operation spills to disk, the in-memory data must be sorted
and merged before being written to a file. This setting reserves a specific
amount of memory for that in-memory sort/merge process. Note: This setting is
irrelevant if the sort operation cannot spill (i.e., if there's no
`DiskManager` configured).
+datafusion.execution.split_file_groups_by_statistics false Attempt to
eliminate sorts by packing & sorting files with non-overlapping statistics into
the same file groups. Currently experimental
datafusion.execution.target_partitions 7 Number of partitions for query
execution. Increasing partitions can increase concurrency. Defaults to the
number of CPU cores on the system
datafusion.execution.time_zone +00:00 The default time zone Some functions,
e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to
this time zone, and then extract the hour
datafusion.explain.logical_plan_only false When set to true, the explain
statement will only print logical plans
diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt
b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt
new file mode 100644
index 0000000000..f7a81f0845
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt
@@ -0,0 +1,262 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# TESTS FOR SORTED PARQUET FILES
+
+# Set 2 partitions for deterministic output plans
+statement ok
+set datafusion.execution.target_partitions = 2;
+
+# Collect statistics -- used for sorting files
+statement ok
+set datafusion.execution.collect_statistics = true;
+
+# Enable split_file_groups_by_statistics since it's currently disabled by
default
+statement ok
+set datafusion.execution.split_file_groups_by_statistics = true;
+
+# Create a table as a data source
+statement ok
+CREATE TABLE src_table (
+ int_col INT,
+ descending_col INT,
+ string_col TEXT,
+ bigint_col BIGINT,
+ date_col DATE,
+ overlapping_col INT,
+ constant_col INT
+) AS VALUES
+-- first file
+(1, 3, 'aaa', 100, 1, 0, 0),
+(2, 2, 'bbb', 200, 2, 1, 0),
+(3, 1, 'ccc', 300, 3, 2, 0),
+-- second file
+(4, 6, 'ddd', 400, 4, 0, 0),
+(5, 5, 'eee', 500, 5, 1, 0),
+(6, 4, 'fff', 600, 6, 2, 0),
+-- third file
+(7, 9, 'ggg', 700, 7, 3, 0),
+(8, 8, 'hhh', 800, 8, 4, 0),
+(9, 7, 'iii', 900, 9, 5, 0);
+
+# Setup 3 files, in particular more files than there are partitions
+
+# File 1:
+query IITIDII
+COPY (SELECT * FROM src_table ORDER BY int_col LIMIT 3)
+TO
'test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet'
+STORED AS PARQUET;
+----
+3
+
+# File 2:
+query IITIDII
+COPY (SELECT * FROM src_table WHERE int_col > 3 ORDER BY int_col LIMIT 3)
+TO
'test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet'
+STORED AS PARQUET;
+----
+3
+
+# Add another file to the directory underlying test_table
+query IITIDII
+COPY (SELECT * FROM src_table WHERE int_col > 6 ORDER BY int_col LIMIT 3)
+TO
'test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet'
+STORED AS PARQUET;
+----
+3
+
+
+# Create a table from generated parquet files:
+statement ok
+CREATE EXTERNAL TABLE test_table (
+ partition_col TEXT NOT NULL,
+ int_col INT NOT NULL,
+ descending_col INT NOT NULL,
+ string_col TEXT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ date_col DATE NOT NULL,
+ overlapping_col INT NOT NULL,
+ constant_col INT NOT NULL
+)
+STORED AS PARQUET
+PARTITIONED BY (partition_col)
+WITH ORDER (int_col ASC NULLS LAST, bigint_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';
+
+# Order by numeric columns
+# This is to exercise file group sorting, which uses file-level statistics
+# DataFusion doesn't currently support string column statistics
+# This should not require a sort.
+query TT
+EXPLAIN SELECT int_col, bigint_col
+FROM test_table
+ORDER BY int_col, bigint_col;
+----
+logical_plan
+01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS
LAST
+02)--TableScan: test_table projection=[int_col, bigint_col]
+physical_plan ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]},
projection=[int_col, bigint_col], output_ordering=[int_col@0 ASC NULLS L [...]
+
+# Another planning test, but project on a column with unsupported statistics
+# We should be able to ignore this and look at only the relevant statistics
+query TT
+EXPLAIN SELECT string_col
+FROM test_table
+ORDER BY int_col, bigint_col;
+----
+logical_plan
+01)Projection: test_table.string_col
+02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS
LAST
+03)----Projection: test_table.string_col, test_table.int_col,
test_table.bigint_col
+04)------TableScan: test_table projection=[int_col, string_col, bigint_col]
+physical_plan
+01)ProjectionExec: expr=[string_col@1 as string_col]
+02)--ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]},
projection=[int_col, string_col, bigint_col], output_ordering=[int_col@0 ASC
NULL [...]
+
+# Clean up & recreate but sort on descending column
+statement ok
+DROP TABLE test_table;
+
+statement ok
+CREATE EXTERNAL TABLE test_table (
+ partition_col TEXT NOT NULL,
+ int_col INT NOT NULL,
+ descending_col INT NOT NULL,
+ string_col TEXT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ date_col DATE NOT NULL,
+ overlapping_col INT NOT NULL,
+ constant_col INT NOT NULL
+)
+STORED AS PARQUET
+PARTITIONED BY (partition_col)
+WITH ORDER (descending_col DESC NULLS LAST, bigint_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';
+
+# Query order by descending_col
+# This should order the files like [C, B, A]
+query TT
+EXPLAIN SELECT descending_col, bigint_col
+FROM test_table
+ORDER BY descending_col DESC NULLS LAST, bigint_col ASC NULLS LAST;
+----
+logical_plan
+01)Sort: test_table.descending_col DESC NULLS LAST, test_table.bigint_col ASC
NULLS LAST
+02)--TableScan: test_table projection=[descending_col, bigint_col]
+physical_plan ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet]]},
projection=[descending_col, bigint_col], output_ordering=[descending_col [...]
+
+# Clean up & re-create with partition columns in sort order
+statement ok
+DROP TABLE test_table;
+
+statement ok
+CREATE EXTERNAL TABLE test_table (
+ partition_col TEXT NOT NULL,
+ int_col INT NOT NULL,
+ descending_col INT NOT NULL,
+ string_col TEXT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ date_col DATE NOT NULL,
+ overlapping_col INT NOT NULL,
+ constant_col INT NOT NULL
+)
+STORED AS PARQUET
+PARTITIONED BY (partition_col)
+WITH ORDER (partition_col ASC NULLS LAST, int_col ASC NULLS LAST, bigint_col
ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';
+
+# Order with partition column first
+# In particular, the partition column is a string
+# Even though statistics for string columns are not supported,
+# string partition columns are common and we do support sorting file groups on
them
+query TT
+EXPLAIN SELECT int_col, bigint_col, partition_col
+FROM test_table
+ORDER BY partition_col, int_col, bigint_col;
+----
+logical_plan
+01)Sort: test_table.partition_col ASC NULLS LAST, test_table.int_col ASC NULLS
LAST, test_table.bigint_col ASC NULLS LAST
+02)--TableScan: test_table projection=[int_col, bigint_col, partition_col]
+physical_plan ParquetExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]},
projection=[int_col, bigint_col, partition_col], output_ordering=[partit [...]
+
+# Clean up & re-create with overlapping column in sort order
+# This will test the ability to sort files with overlapping statistics
+statement ok
+DROP TABLE test_table;
+
+statement ok
+CREATE EXTERNAL TABLE test_table (
+ partition_col TEXT NOT NULL,
+ int_col INT NOT NULL,
+ descending_col INT NOT NULL,
+ string_col TEXT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ date_col DATE NOT NULL,
+ overlapping_col INT NOT NULL,
+ constant_col INT NOT NULL
+)
+STORED AS PARQUET
+PARTITIONED BY (partition_col)
+WITH ORDER (overlapping_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';
+
+query TT
+EXPLAIN SELECT int_col, bigint_col, overlapping_col
+FROM test_table
+ORDER BY overlapping_col;
+----
+logical_plan
+01)Sort: test_table.overlapping_col ASC NULLS LAST
+02)--TableScan: test_table projection=[int_col, bigint_col, overlapping_col]
+physical_plan
+01)SortPreservingMergeExec: [overlapping_col@2 ASC NULLS LAST]
+02)--ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]},
projection=[int_col, bigint_col, overlapping_col], output_ordering=[overlappin
[...]
+
+# Clean up & re-create with constant column in sort order
+# This will require a sort because the # of required file groups (3)
+# exceeds the # of target partitions (2)
+statement ok
+DROP TABLE test_table;
+
+statement ok
+CREATE EXTERNAL TABLE test_table (
+ partition_col TEXT NOT NULL,
+ int_col INT NOT NULL,
+ descending_col INT NOT NULL,
+ string_col TEXT NOT NULL,
+ bigint_col BIGINT NOT NULL,
+ date_col DATE NOT NULL,
+ overlapping_col INT NOT NULL,
+ constant_col INT NOT NULL
+)
+STORED AS PARQUET
+PARTITIONED BY (partition_col)
+WITH ORDER (constant_col ASC NULLS LAST)
+LOCATION 'test_files/scratch/parquet_sorted_statistics/test_table';
+
+query TT
+EXPLAIN SELECT constant_col
+FROM test_table
+ORDER BY constant_col;
+----
+logical_plan
+01)Sort: test_table.constant_col ASC NULLS LAST
+02)--TableScan: test_table projection=[constant_col]
+physical_plan
+01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST]
+02)--SortExec: expr=[constant_col@0 ASC NULLS LAST],
preserve_partitioning=[true]
+03)----ParquetExec: file_groups={2 groups:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet,
WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet],
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]},
projection=[constant_col]
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs
b/datafusion/substrait/src/physical_plan/consumer.rs
index 11ddb91ad3..50b08e7793 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -93,6 +93,7 @@ pub async fn from_substrait_rel(
},
partition_values: vec![],
range: None,
+ statistics: None,
extensions: None,
};
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index a90f033062..af7a92f403 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -84,6 +84,7 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.execution.max_buffered_batches_per_output_file | 2
| This is the maximum number of RecordBatches buffered
for each output file being worked. Higher values can potentially give faster
write performance at the cost of higher peak memory consumption
[...]
| datafusion.execution.listing_table_ignore_subdirectory |
true | Should sub directories be ignored when scanning
directories for data files. Defaults to true (ignores subdirectories),
consistent with Hive. Note that this setting does not affect reading
partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`).
[...]
| datafusion.execution.enable_recursive_ctes |
true | Should DataFusion support recursive CTEs
[...]
+| datafusion.execution.split_file_groups_by_statistics |
false | Attempt to eliminate sorts by packing & sorting
files with non-overlapping statistics into the same file groups. Currently
experimental
[...]
| datafusion.optimizer.enable_distinct_aggregation_soft_limit |
true | When set to true, the optimizer will push a limit
operation into grouped aggregations which have no aggregate expressions, as a
soft limit, emitting groups once the limit is reached, before all rows in the
group are read.
[...]
| datafusion.optimizer.enable_round_robin_repartition |
true | When set to true, the physical plan optimizer will
try to add round robin repartitioning to increase parallelism to leverage more
CPU cores
[...]
| datafusion.optimizer.enable_topk_aggregation |
true | When set to true, the optimizer will attempt to
perform limit operations during aggregations, if possible
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]