This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 102caeb226 minor: More comments to `ParquetOpener::open()` (#19677)
102caeb226 is described below
commit 102caeb2261c5ae006c201546cf74769d80ceff8
Author: Yongting You <[email protected]>
AuthorDate: Thu Jan 8 11:14:09 2026 +0800
minor: More comments to `ParquetOpener::open()` (#19677)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #.
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
`ParquetOpener::open()` is a critical function for parquet planning,
it's the entry point for many major steps like row-group/file pruning.
It has almost 400 lines of code now, this PR adds some markers to the
code blocks/important steps, to make this function easier to navigate.
(though I may have overlooked some critical steps)
Ideally, we should break these blocks into utilities. I tried extracting
some of them with AI, but the resulting utilities still have unclear
semantics, with many input arguments and output items. Overall, the
complexity doesn’t seem reduced after the change. I think it’s possible
to factor them into helper functions with clear semantics, but that
likely requires someone who understands the implementation details very
well.
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
## Are these changes tested?
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/datasource-parquet/src/opener.rs | 43 ++++++++++++++++++++++++++---
1 file changed, 39 insertions(+), 4 deletions(-)
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index 83bdf79c8f..570f9b4412 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -180,6 +180,9 @@ impl PreparedAccessPlan {
impl FileOpener for ParquetOpener {
fn open(&self, partitioned_file: PartitionedFile) ->
Result<FileOpenFuture> {
+ // -----------------------------------
+ // Step: prepare configurations, etc.
+ // -----------------------------------
let file_range = partitioned_file.range.clone();
let extensions = partitioned_file.extensions.clone();
let file_location = partitioned_file.object_meta.location.clone();
@@ -280,6 +283,10 @@ impl FileOpener for ParquetOpener {
.get_file_decryption_properties(&file_location)
.await?;
+ // ---------------------------------------------
+ // Step: try to prune the current file partition
+ // ---------------------------------------------
+
// Prune this file using the file level statistics and partition
values.
// Since dynamic filters may have been updated since planning it
is possible that we are able
// to prune files now that we couldn't prune at planning time.
@@ -328,6 +335,10 @@ impl FileOpener for ParquetOpener {
file_metrics.files_ranges_pruned_statistics.add_matched(1);
+ // --------------------------------------------------------
+ // Step: fetch Parquet metadata (and optionally page index)
+ // --------------------------------------------------------
+
// Don't load the page index yet. Since it is not stored inline in
// the footer, loading the page index if it is not needed will do
// unnecessary I/O. We decide later if it is needed to evaluate the
@@ -428,14 +439,21 @@ impl FileOpener for ParquetOpener {
metadata_timer.stop();
+ // ---------------------------------------------------------
+ // Step: construct builder for the final RecordBatch stream
+ // ---------------------------------------------------------
+
let mut builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(
async_file_reader,
reader_metadata,
);
- let indices = projection.column_indices();
-
- let mask = ProjectionMask::roots(builder.parquet_schema(),
indices);
+ //
---------------------------------------------------------------------
+ // Step: optionally add row filter to the builder
+ //
+ // Row filter is used for late materialization in parquet
decoding, see
+ // `row_filter` for details.
+ //
---------------------------------------------------------------------
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) =
pushdown_filters.then_some(predicate).flatten() {
@@ -464,6 +482,10 @@ impl FileOpener for ParquetOpener {
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
}
+ // ------------------------------------------------------------
+ // Step: prune row groups by range, predicate and bloom filter
+ // ------------------------------------------------------------
+
// Determine which row groups to actually read. The idea is to skip
// as many row groups as possible based on the metadata and query
let file_metadata = Arc::clone(builder.metadata());
@@ -525,9 +547,13 @@ impl FileOpener for ParquetOpener {
let mut access_plan = row_groups.build();
+ // --------------------------------------------------------
+ // Step: prune pages from the kept row groups
+ //
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
+ // --------------------------------------------------------
if enable_page_index
&& !access_plan.is_empty()
&& let Some(p) = page_pruning_predicate
@@ -545,7 +571,10 @@ impl FileOpener for ParquetOpener {
let mut prepared_plan =
PreparedAccessPlan::from_access_plan(access_plan,
rg_metadata)?;
- // If reverse scanning is enabled, reverse the prepared plan
+ // ----------------------------------------------------------
+ // Step: potentially reverse the access plan for performance.
+ // See `ParquetSource::try_reverse_output` for the rationale.
+ // ----------------------------------------------------------
if reverse_row_groups {
prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
}
@@ -564,6 +593,9 @@ impl FileOpener for ParquetOpener {
// metrics from the arrow reader itself
let arrow_reader_metrics = ArrowReaderMetrics::enabled();
+ let indices = projection.column_indices();
+ let mask = ProjectionMask::roots(builder.parquet_schema(),
indices);
+
let stream = builder
.with_projection(mask)
.with_batch_size(batch_size)
@@ -621,6 +653,9 @@ impl FileOpener for ParquetOpener {
})
});
+ //
----------------------------------------------------------------------
+ // Step: wrap the stream so a dynamic filter can stop the file
scan early
+ //
----------------------------------------------------------------------
if let Some(file_pruner) = file_pruner {
Ok(EarlyStoppingStream::new(
stream,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]