This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b0015d3a93 feat(parquet): Implement `scan_efficiency_ratio` metric for
parquet reading (#18577)
b0015d3a93 is described below
commit b0015d3a93377c29348bc5a5395f1a3c76a91353
Author: Peter Nguyen <[email protected]>
AuthorDate: Sat Nov 15 18:39:50 2025 -0800
feat(parquet): Implement `scan_efficiency_ratio` metric for parquet reading
(#18577)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Part of #18195
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Added test
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
Yes, new metric to view.
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/core/tests/sql/explain_analyze.rs | 1 +
datafusion/datasource-parquet/src/metrics.rs | 15 ++-
datafusion/datasource-parquet/src/reader.rs | 26 +++++
.../datasource-parquet/src/row_group_filter.rs | 16 ++-
datafusion/physical-plan/src/metrics/builder.rs | 14 ++-
datafusion/physical-plan/src/metrics/mod.rs | 4 +-
datafusion/physical-plan/src/metrics/value.rs | 117 +++++++++++++++++++--
7 files changed, 180 insertions(+), 13 deletions(-)
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index cbc66581e9..71e9e840d9 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -887,6 +887,7 @@ async fn parquet_explain_analyze() {
&formatted,
"row_groups_pruned_statistics=1 total \u{2192} 1 matched"
);
+ assert_contains!(&formatted, "scan_efficiency_ratio=14% (259/1851)");
// The order of metrics is expected to be the same as the actual pruning
order
// (file-> row-group -> page)
diff --git a/datafusion/datasource-parquet/src/metrics.rs
b/datafusion/datasource-parquet/src/metrics.rs
index 306bc9e6b0..5eaa137e9a 100644
--- a/datafusion/datasource-parquet/src/metrics.rs
+++ b/datafusion/datasource-parquet/src/metrics.rs
@@ -16,7 +16,8 @@
// under the License.
use datafusion_physical_plan::metrics::{
- Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
Time,
+ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, PruningMetrics,
+ RatioMergeStrategy, RatioMetrics, Time,
};
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -66,6 +67,8 @@ pub struct ParquetFileMetrics {
pub page_index_eval_time: Time,
/// Total time spent reading and parsing metadata from the footer
pub metadata_load_time: Time,
+ /// Scan Efficiency Ratio, calculated as bytes_scanned / total_file_size
+ pub scan_efficiency_ratio: RatioMetrics,
/// Predicate Cache: number of records read directly from the inner reader.
/// This is the number of rows decoded while evaluating predicates
pub predicate_cache_inner_records: Count,
@@ -114,6 +117,15 @@ impl ParquetFileMetrics {
.with_type(MetricType::SUMMARY)
.pruning_metrics("files_ranges_pruned_statistics", partition);
+ let scan_efficiency_ratio = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
+ .ratio_metrics_with_strategy(
+ "scan_efficiency_ratio",
+ partition,
+ RatioMergeStrategy::AddPartSetTotal,
+ );
+
// -----------------------
// 'dev' level metrics
// -----------------------
@@ -164,6 +176,7 @@ impl ParquetFileMetrics {
bloom_filter_eval_time,
page_index_eval_time,
metadata_load_time,
+ scan_efficiency_ratio,
predicate_cache_inner_records,
predicate_cache_records,
}
diff --git a/datafusion/datasource-parquet/src/reader.rs
b/datafusion/datasource-parquet/src/reader.rs
index 6670109cf7..59a5da7b9d 100644
--- a/datafusion/datasource-parquet/src/reader.rs
+++ b/datafusion/datasource-parquet/src/reader.rs
@@ -97,6 +97,7 @@ impl DefaultParquetFileReaderFactory {
pub struct ParquetFileReader {
pub file_metrics: ParquetFileMetrics,
pub inner: ParquetObjectReader,
+ pub partitioned_file: PartitionedFile,
}
impl AsyncFileReader for ParquetFileReader {
@@ -129,6 +130,18 @@ impl AsyncFileReader for ParquetFileReader {
}
}
+impl Drop for ParquetFileReader {
+ fn drop(&mut self) {
+ self.file_metrics
+ .scan_efficiency_ratio
+ .add_part(self.file_metrics.bytes_scanned.value());
+ // Multiple ParquetFileReaders may run, so we set_total to avoid
adding the total multiple times
+ self.file_metrics
+ .scan_efficiency_ratio
+ .set_total(self.partitioned_file.object_meta.size as usize);
+ }
+}
+
impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
@@ -156,6 +169,7 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
Ok(Box::new(ParquetFileReader {
inner,
file_metrics,
+ partitioned_file,
}))
}
}
@@ -306,6 +320,18 @@ impl AsyncFileReader for CachedParquetFileReader {
}
}
+impl Drop for CachedParquetFileReader {
+ fn drop(&mut self) {
+ self.file_metrics
+ .scan_efficiency_ratio
+ .add_part(self.file_metrics.bytes_scanned.value());
+ // Multiple ParquetFileReaders may run, so we set_total to avoid
adding the total multiple times
+ self.file_metrics
+ .scan_efficiency_ratio
+ .set_total(self.partitioned_file.object_meta.size as usize);
+ }
+}
+
/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs
b/datafusion/datasource-parquet/src/row_group_filter.rs
index 2043f75070..90e4e10d5a 100644
--- a/datafusion/datasource-parquet/src/row_group_filter.rs
+++ b/datafusion/datasource-parquet/src/row_group_filter.rs
@@ -1533,6 +1533,7 @@ mod tests {
data: bytes::Bytes,
pruning_predicate: &PruningPredicate,
) -> Result<RowGroupAccessPlanFilter> {
+ use datafusion_datasource::PartitionedFile;
use object_store::{ObjectMeta, ObjectStore};
let object_meta = ObjectMeta {
@@ -1551,12 +1552,23 @@ mod tests {
let metrics = ExecutionPlanMetricsSet::new();
let file_metrics =
ParquetFileMetrics::new(0, object_meta.location.as_ref(),
&metrics);
- let inner = ParquetObjectReader::new(Arc::new(in_memory),
object_meta.location)
- .with_file_size(object_meta.size);
+ let inner =
+ ParquetObjectReader::new(Arc::new(in_memory),
object_meta.location.clone())
+ .with_file_size(object_meta.size);
+
+ let partitioned_file = PartitionedFile {
+ object_meta,
+ partition_values: vec![],
+ range: None,
+ statistics: None,
+ extensions: None,
+ metadata_size_hint: None,
+ };
let reader = ParquetFileReader {
inner,
file_metrics: file_metrics.clone(),
+ partitioned_file,
};
let mut builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
diff --git a/datafusion/physical-plan/src/metrics/builder.rs
b/datafusion/physical-plan/src/metrics/builder.rs
index 91b2440122..21f6762e73 100644
--- a/datafusion/physical-plan/src/metrics/builder.rs
+++ b/datafusion/physical-plan/src/metrics/builder.rs
@@ -20,7 +20,7 @@
use std::{borrow::Cow, sync::Arc};
use crate::metrics::{
- value::{PruningMetrics, RatioMetrics},
+ value::{PruningMetrics, RatioMergeStrategy, RatioMetrics},
MetricType,
};
@@ -283,7 +283,17 @@ impl<'a> MetricBuilder<'a> {
name: impl Into<Cow<'static, str>>,
partition: usize,
) -> RatioMetrics {
- let ratio_metrics = RatioMetrics::new();
+ self.ratio_metrics_with_strategy(name, partition,
RatioMergeStrategy::default())
+ }
+
+ /// Consumes self and creates a new [`RatioMetrics`] with a specific merge
strategy
+ pub fn ratio_metrics_with_strategy(
+ self,
+ name: impl Into<Cow<'static, str>>,
+ partition: usize,
+ merge_strategy: RatioMergeStrategy,
+ ) -> RatioMetrics {
+ let ratio_metrics =
RatioMetrics::new().with_merge_strategy(merge_strategy);
self.with_partition(partition).build(MetricValue::Ratio {
name: name.into(),
ratio_metrics: ratio_metrics.clone(),
diff --git a/datafusion/physical-plan/src/metrics/mod.rs
b/datafusion/physical-plan/src/metrics/mod.rs
index 613c031808..c39779090b 100644
--- a/datafusion/physical-plan/src/metrics/mod.rs
+++ b/datafusion/physical-plan/src/metrics/mod.rs
@@ -36,8 +36,8 @@ pub use baseline::{BaselineMetrics, RecordOutput,
SpillMetrics, SplitMetrics};
pub use builder::MetricBuilder;
pub use custom::CustomMetricValue;
pub use value::{
- Count, Gauge, MetricValue, PruningMetrics, RatioMetrics, ScopedTimerGuard,
Time,
- Timestamp,
+ Count, Gauge, MetricValue, PruningMetrics, RatioMergeStrategy,
RatioMetrics,
+ ScopedTimerGuard, Time, Timestamp,
};
/// Something that tracks a value of interest (metric) of a DataFusion
diff --git a/datafusion/physical-plan/src/metrics/value.rs
b/datafusion/physical-plan/src/metrics/value.rs
index 7f31f75794..a5032ec8de 100644
--- a/datafusion/physical-plan/src/metrics/value.rs
+++ b/datafusion/physical-plan/src/metrics/value.rs
@@ -437,6 +437,15 @@ impl PruningMetrics {
pub struct RatioMetrics {
part: Arc<AtomicUsize>,
total: Arc<AtomicUsize>,
+ merge_strategy: RatioMergeStrategy,
+}
+
+#[derive(Debug, Clone, Default)]
+pub enum RatioMergeStrategy {
+ #[default]
+ AddPartAddTotal,
+ AddPartSetTotal,
+ SetPartAddTotal,
}
impl RatioMetrics {
@@ -445,9 +454,15 @@ impl RatioMetrics {
Self {
part: Arc::new(AtomicUsize::new(0)),
total: Arc::new(AtomicUsize::new(0)),
+ merge_strategy: RatioMergeStrategy::AddPartAddTotal,
}
}
+ pub fn with_merge_strategy(mut self, merge_strategy: RatioMergeStrategy)
-> Self {
+ self.merge_strategy = merge_strategy;
+ self
+ }
+
/// Add `n` to the numerator (`part`) value
pub fn add_part(&self, n: usize) {
self.part.fetch_add(n, Ordering::Relaxed);
@@ -458,10 +473,32 @@ impl RatioMetrics {
self.total.fetch_add(n, Ordering::Relaxed);
}
+ /// Set the numerator (`part`) value to `n`, overwriting any existing value
+ pub fn set_part(&self, n: usize) {
+ self.part.store(n, Ordering::Relaxed);
+ }
+
+ /// Set the denominator (`total`) value to `n`, overwriting any existing
value
+ pub fn set_total(&self, n: usize) {
+ self.total.store(n, Ordering::Relaxed);
+ }
+
/// Merge the value from `other` into `self`
pub fn merge(&self, other: &Self) {
- self.add_part(other.part());
- self.add_total(other.total());
+ match self.merge_strategy {
+ RatioMergeStrategy::AddPartAddTotal => {
+ self.add_part(other.part());
+ self.add_total(other.total());
+ }
+ RatioMergeStrategy::AddPartSetTotal => {
+ self.add_part(other.part());
+ self.set_total(other.total());
+ }
+ RatioMergeStrategy::SetPartAddTotal => {
+ self.set_part(other.part());
+ self.add_total(other.total());
+ }
+ }
}
/// Return the numerator (`part`) value
@@ -784,10 +821,17 @@ impl MetricValue {
name: name.clone(),
pruning_metrics: PruningMetrics::new(),
},
- Self::Ratio { name, .. } => Self::Ratio {
- name: name.clone(),
- ratio_metrics: RatioMetrics::new(),
- },
+ Self::Ratio {
+ name,
+ ratio_metrics,
+ } => {
+ let merge_strategy = ratio_metrics.merge_strategy.clone();
+ Self::Ratio {
+ name: name.clone(),
+ ratio_metrics: RatioMetrics::new()
+ .with_merge_strategy(merge_strategy),
+ }
+ }
Self::Custom { name, value } => Self::Custom {
name: name.clone(),
value: value.new_empty(),
@@ -1140,6 +1184,67 @@ mod tests {
assert_eq!("0.033% (1/3000)", tiny_ratio.to_string());
}
+ #[test]
+ fn test_ratio_set_methods() {
+ let ratio_metrics = RatioMetrics::new();
+
+ // Ensure set methods don't increment
+ ratio_metrics.set_part(10);
+ ratio_metrics.set_part(10);
+ ratio_metrics.set_total(40);
+ ratio_metrics.set_total(40);
+ assert_eq!("25% (10/40)", ratio_metrics.to_string());
+
+ let ratio_metrics = RatioMetrics::new();
+
+ // Calling set should change the value
+ ratio_metrics.set_part(10);
+ ratio_metrics.set_part(30);
+ ratio_metrics.set_total(40);
+ ratio_metrics.set_total(50);
+ assert_eq!("60% (30/50)", ratio_metrics.to_string());
+ }
+
+ #[test]
+ fn test_ratio_merge_strategy() {
+ // Test AddPartSetTotal strategy
+ let ratio_metrics1 =
+
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
+
+ ratio_metrics1.set_part(10);
+ ratio_metrics1.set_total(40);
+ assert_eq!("25% (10/40)", ratio_metrics1.to_string());
+ let ratio_metrics2 =
+
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::AddPartSetTotal);
+ ratio_metrics2.set_part(20);
+ ratio_metrics2.set_total(40);
+ assert_eq!("50% (20/40)", ratio_metrics2.to_string());
+
+ ratio_metrics1.merge(&ratio_metrics2);
+ assert_eq!("75% (30/40)", ratio_metrics1.to_string());
+
+ // Test SetPartAddTotal strategy
+ let ratio_metrics1 =
+
RatioMetrics::new().with_merge_strategy(RatioMergeStrategy::SetPartAddTotal);
+ ratio_metrics1.set_part(20);
+ ratio_metrics1.set_total(50);
+ let ratio_metrics2 = RatioMetrics::new();
+ ratio_metrics2.set_part(20);
+ ratio_metrics2.set_total(50);
+ ratio_metrics1.merge(&ratio_metrics2);
+ assert_eq!("20% (20/100)", ratio_metrics1.to_string());
+
+ // Test AddPartAddTotal strategy (default)
+ let ratio_metrics1 = RatioMetrics::new();
+ ratio_metrics1.set_part(20);
+ ratio_metrics1.set_total(50);
+ let ratio_metrics2 = RatioMetrics::new();
+ ratio_metrics2.set_part(20);
+ ratio_metrics2.set_total(50);
+ ratio_metrics1.merge(&ratio_metrics2);
+ assert_eq!("40% (40/100)", ratio_metrics1.to_string());
+ }
+
#[test]
fn test_display_timestamp() {
let timestamp = Timestamp::new();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]