This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6cf74d64e3 Make `SessionContext::register_parquet` obey
`collect_statistics` config (#16080)
6cf74d64e3 is described below
commit 6cf74d64e39ae2003defa872fe10fda56f8ef8f2
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Wed May 21 07:46:26 2025 -0700
Make `SessionContext::register_parquet` obey `collect_statistics` config
(#16080)
* fix
* add a test
* fmt
* add to upgrade guide
* fix tests
* fix test
* fix test
* fix ci
* Fix example in upgrade guide (#29)
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../core/src/datasource/file_format/options.rs | 10 ++---
datafusion/core/src/datasource/listing/table.rs | 41 +++++++++++++++++---
.../core/src/datasource/listing_table_factory.rs | 3 +-
datafusion/core/src/execution/context/parquet.rs | 45 ++++++++++++++++++++++
datafusion/core/tests/parquet/file_statistics.rs | 6 ++-
datafusion/core/tests/sql/explain_analyze.rs | 4 +-
datafusion/core/tests/sql/path_partition.rs | 7 +++-
docs/source/library-user-guide/upgrading.md | 15 ++++++++
8 files changed, 114 insertions(+), 17 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index 08e9a628dd..9aaf1cf598 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -550,7 +550,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
- .with_target_partitions(config.target_partitions())
+ .with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
}
@@ -585,9 +585,9 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
- .with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
+ .with_session_config_options(config)
}
async fn get_resolved_schema(
@@ -615,7 +615,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
- .with_target_partitions(config.target_partitions())
+ .with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
}
@@ -643,7 +643,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
- .with_target_partitions(config.target_partitions())
+ .with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
}
@@ -669,7 +669,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
- .with_target_partitions(config.target_partitions())
+ .with_session_config_options(config)
.with_table_partition_cols(self.table_partition_cols.clone())
}
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 26daa88c9e..3c87d3ee23 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -32,6 +32,7 @@ use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig,
FileScanConfigBuilder};
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
+use datafusion_execution::config::SessionConfig;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
@@ -195,7 +196,8 @@ impl ListingTableConfig {
let listing_options = ListingOptions::new(file_format)
.with_file_extension(listing_file_extension)
- .with_target_partitions(state.config().target_partitions());
+ .with_target_partitions(state.config().target_partitions())
+ .with_collect_stat(state.config().collect_statistics());
Ok(Self {
table_paths: self.table_paths,
@@ -313,18 +315,29 @@ impl ListingOptions {
/// - use default file extension filter
/// - no input partition to discover
/// - one target partition
- /// - stat collection
+ /// - do not collect statistics
pub fn new(format: Arc<dyn FileFormat>) -> Self {
Self {
file_extension: format.get_ext(),
format,
table_partition_cols: vec![],
- collect_stat: true,
+ collect_stat: false,
target_partitions: 1,
file_sort_order: vec![],
}
}
+ /// Set options from [`SessionConfig`] and returns self.
+ ///
+ /// Currently this sets `target_partitions` and `collect_stat`
+ /// but if more options are added in the future that need to be coordinated
+ /// they will be synchronized thorugh this method.
+ pub fn with_session_config_options(mut self, config: &SessionConfig) ->
Self {
+ self = self.with_target_partitions(config.target_partitions());
+ self = self.with_collect_stat(config.collect_statistics());
+ self
+ }
+
/// Set file extension on [`ListingOptions`] and returns self.
///
/// # Example
@@ -1282,7 +1295,9 @@ mod tests {
#[tokio::test]
async fn read_single_file() -> Result<()> {
- let ctx = SessionContext::new();
+ let ctx = SessionContext::new_with_config(
+ SessionConfig::new().with_collect_statistics(true),
+ );
let table = load_table(&ctx, "alltypes_plain.parquet").await?;
let projection = None;
@@ -1309,7 +1324,7 @@ mod tests {
#[cfg(feature = "parquet")]
#[tokio::test]
- async fn load_table_stats_by_default() -> Result<()> {
+ async fn do_not_load_table_stats_by_default() -> Result<()> {
use crate::datasource::file_format::parquet::ParquetFormat;
let testdata = crate::test_util::parquet_test_data();
@@ -1321,6 +1336,22 @@ mod tests {
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schema = opt.infer_schema(&state, &table_path).await?;
+ let config = ListingTableConfig::new(table_path.clone())
+ .with_listing_options(opt)
+ .with_schema(schema);
+ let table = ListingTable::try_new(config)?;
+
+ let exec = table.scan(&state, None, &[], None).await?;
+ assert_eq!(exec.partition_statistics(None)?.num_rows,
Precision::Absent);
+ // TODO correct byte size:
https://github.com/apache/datafusion/issues/14936
+ assert_eq!(
+ exec.partition_statistics(None)?.total_byte_size,
+ Precision::Absent
+ );
+
+ let opt = ListingOptions::new(Arc::new(ParquetFormat::default()))
+ .with_collect_stat(true);
+ let schema = opt.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(opt)
.with_schema(schema);
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 636d1623c5..71686c61a8 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -111,9 +111,8 @@ impl TableProviderFactory for ListingTableFactory {
let table_path = ListingTableUrl::parse(&cmd.location)?;
let options = ListingOptions::new(file_format)
- .with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
- .with_target_partitions(state.config().target_partitions())
+ .with_session_config_options(session_state.config())
.with_table_partition_cols(table_partition_cols);
options
diff --git a/datafusion/core/src/execution/context/parquet.rs
b/datafusion/core/src/execution/context/parquet.rs
index a7c4887b24..23ac6b8848 100644
--- a/datafusion/core/src/execution/context/parquet.rs
+++ b/datafusion/core/src/execution/context/parquet.rs
@@ -84,6 +84,8 @@ mod tests {
use crate::parquet::basic::Compression;
use crate::test_util::parquet_test_data;
+ use arrow::util::pretty::pretty_format_batches;
+ use datafusion_common::assert_contains;
use datafusion_common::config::TableParquetOptions;
use datafusion_execution::config::SessionConfig;
@@ -129,6 +131,49 @@ mod tests {
Ok(())
}
+ async fn explain_query_all_with_config(config: SessionConfig) ->
Result<String> {
+ let ctx = SessionContext::new_with_config(config);
+
+ ctx.register_parquet(
+ "test",
+ &format!("{}/alltypes_plain*.parquet", parquet_test_data()),
+ ParquetReadOptions::default(),
+ )
+ .await?;
+ let df = ctx.sql("EXPLAIN SELECT * FROM test").await?;
+ let results = df.collect().await?;
+ let content = pretty_format_batches(&results).unwrap().to_string();
+ Ok(content)
+ }
+
+ #[tokio::test]
+ async fn register_parquet_respects_collect_statistics_config() ->
Result<()> {
+ // The default is false
+ let mut config = SessionConfig::new();
+ config.options_mut().explain.physical_plan_only = true;
+ config.options_mut().explain.show_statistics = true;
+ let content = explain_query_all_with_config(config).await?;
+ assert_contains!(content, "statistics=[Rows=Absent,");
+
+ // Explicitly set to false
+ let mut config = SessionConfig::new();
+ config.options_mut().explain.physical_plan_only = true;
+ config.options_mut().explain.show_statistics = true;
+ config.options_mut().execution.collect_statistics = false;
+ let content = explain_query_all_with_config(config).await?;
+ assert_contains!(content, "statistics=[Rows=Absent,");
+
+ // Explicitly set to true
+ let mut config = SessionConfig::new();
+ config.options_mut().explain.physical_plan_only = true;
+ config.options_mut().explain.show_statistics = true;
+ config.options_mut().execution.collect_statistics = true;
+ let content = explain_query_all_with_config(config).await?;
+ assert_contains!(content, "statistics=[Rows=Exact(10),");
+
+ Ok(())
+ }
+
#[tokio::test]
async fn read_from_registered_table_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();
diff --git a/datafusion/core/tests/parquet/file_statistics.rs
b/datafusion/core/tests/parquet/file_statistics.rs
index a038d414cb..a60beaf665 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -50,7 +50,8 @@ async fn check_stats_precision_with_filter_pushdown() {
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
- let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+ let opt =
+
ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
let table = get_listing_table(&table_path, None, &opt).await;
let (_, _, state) = get_cache_runtime_state();
@@ -109,7 +110,8 @@ async fn load_table_stats_with_session_level_cache() {
// Create a separate DefaultFileStatisticsCache
let (cache2, _, state2) = get_cache_runtime_state();
- let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+ let opt =
+
ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
let table1 = get_listing_table(&table_path, Some(cache1), &opt).await;
let table2 = get_listing_table(&table_path, Some(cache2), &opt).await;
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index e8ef34c2af..3c7d85ef87 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -561,7 +561,9 @@ async fn csv_explain_verbose_plans() {
async fn explain_analyze_runs_optimizers(#[values("*", "1")] count_expr: &str)
{
// repro for https://github.com/apache/datafusion/issues/917
// where EXPLAIN ANALYZE was not correctly running optimizer
- let ctx = SessionContext::new();
+ let ctx = SessionContext::new_with_config(
+ SessionConfig::new().with_collect_statistics(true),
+ );
register_alltypes_parquet(&ctx).await;
// This happens as an optimization pass where count(*)/count(1) can be
diff --git a/datafusion/core/tests/sql/path_partition.rs
b/datafusion/core/tests/sql/path_partition.rs
index 131a396ccb..5e9748d23d 100644
--- a/datafusion/core/tests/sql/path_partition.rs
+++ b/datafusion/core/tests/sql/path_partition.rs
@@ -431,7 +431,9 @@ async fn parquet_multiple_nonstring_partitions() ->
Result<()> {
#[tokio::test]
async fn parquet_statistics() -> Result<()> {
- let ctx = SessionContext::new();
+ let mut config = SessionConfig::new();
+ config.options_mut().execution.collect_statistics = true;
+ let ctx = SessionContext::new_with_config(config);
register_partitioned_alltypes_parquet(
&ctx,
@@ -583,7 +585,8 @@ async fn create_partitioned_alltypes_parquet_table(
.iter()
.map(|x| (x.0.to_owned(), x.1.clone()))
.collect::<Vec<_>>(),
- );
+ )
+ .with_session_config_options(&ctx.copied_config());
let table_path = ListingTableUrl::parse(table_path).unwrap();
let store_path =
diff --git a/docs/source/library-user-guide/upgrading.md
b/docs/source/library-user-guide/upgrading.md
index 620be657b4..a6ff73e13d 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -21,6 +21,21 @@
## DataFusion `48.0.0`
+### `ListingOptions` default for `collect_stat` changed from `true` to `false`
+
+This makes it agree with the default for `SessionConfig`.
+Most users won't be impacted by this change but if you were using
`ListingOptions` directly
+and relied on the default value of `collect_stat` being `true`, you will need
to
+explicitly set it to `true` in your code.
+
+```rust
+# /* comment to avoid running
+ListingOptions::new(Arc::new(ParquetFormat::default()))
+ .with_collect_stat(true)
+ // other options
+# */
+```
+
### Processing `Field` instead of `DataType` for user defined functions
In order to support metadata handling and extension types, user defined
functions are
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]