This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e54eb42388 Fix predicate_rows_pruned & predicate_rows_matched metrics 
(#18980)
e54eb42388 is described below

commit e54eb423881f2baa3800cba3338ca6f9b5a0d300
Author: xudong.w <[email protected]>
AuthorDate: Mon Dec 1 04:49:23 2025 +0800

    Fix predicate_rows_pruned & predicate_rows_matched metrics (#18980)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #.
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 .../core/src/datasource/physical_plan/parquet.rs   | 94 ++++++++++++++++++++++
 datafusion/datasource-parquet/src/row_filter.rs    | 25 +++++-
 2 files changed, 116 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index 90953e3f5d..4d7c4a8788 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -1874,6 +1874,100 @@ mod tests {
         assert_contains!(&explain, "projection=[c1]");
     }
 
+    #[tokio::test]
+    async fn parquet_exec_metrics_with_multiple_predicates() {
+        // Test that metrics are correctly calculated when multiple predicates
+        // are pushed down (connected with AND). This ensures we don't 
double-count
+        // rows when multiple predicates filter the data sequentially.
+
+        // Create a batch with two columns: c1 (string) and c2 (int32)
+        // Total: 10 rows
+        let c1: ArrayRef = Arc::new(StringArray::from(vec![
+            Some("foo"), // 0 - passes c1 filter, fails c2 filter (5 <= 10)
+            Some("bar"), // 1 - fails c1 filter
+            Some("bar"), // 2 - fails c1 filter
+            Some("baz"), // 3 - passes both filters (20 > 10)
+            Some("foo"), // 4 - passes both filters (12 > 10)
+            Some("bar"), // 5 - fails c1 filter
+            Some("baz"), // 6 - passes both filters (25 > 10)
+            Some("foo"), // 7 - passes c1 filter, fails c2 filter (7 <= 10)
+            Some("bar"), // 8 - fails c1 filter
+            Some("qux"), // 9 - passes both filters (30 > 10)
+        ]));
+
+        let c2: ArrayRef = Arc::new(Int32Array::from(vec![
+            Some(5),
+            Some(15),
+            Some(8),
+            Some(20),
+            Some(12),
+            Some(9),
+            Some(25),
+            Some(7),
+            Some(18),
+            Some(30),
+        ]));
+
+        let batch = create_batch(vec![("c1", c1), ("c2", c2)]);
+
+        // Create filter: c1 != 'bar' AND c2 > 10
+        //
+        // First predicate (c1 != 'bar'):
+        //   - Rows passing: 0, 3, 4, 6, 7, 9 (6 rows)
+        //   - Rows pruned: 1, 2, 5, 8 (4 rows)
+        //
+        // Second predicate (c2 > 10) on remaining 6 rows:
+        //   - Rows passing: 3, 4, 6, 9 (4 rows with c2 = 20, 12, 25, 30)
+        //   - Rows pruned: 0, 7 (2 rows with c2 = 5, 7)
+        //
+        // Expected final metrics:
+        //   - pushdown_rows_matched: 4 (final result)
+        //   - pushdown_rows_pruned: 4 + 2 = 6 (cumulative)
+        //   - Total: 4 + 6 = 10
+
+        let filter = col("c1").not_eq(lit("bar")).and(col("c2").gt(lit(10)));
+
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_pushdown_predicate()
+            .round_trip(vec![batch])
+            .await;
+
+        let metrics = rt.parquet_exec.metrics().unwrap();
+
+        // Verify the result rows
+        assert_snapshot!(batches_to_string(&rt.batches.unwrap()),@r###"
+            +-----+----+
+            | c1  | c2 |
+            +-----+----+
+            | baz | 20 |
+            | foo | 12 |
+            | baz | 25 |
+            | qux | 30 |
+            +-----+----+
+        "###);
+
+        // Verify metrics - this is the key test
+        let pushdown_rows_matched = get_value(&metrics, 
"pushdown_rows_matched");
+        let pushdown_rows_pruned = get_value(&metrics, "pushdown_rows_pruned");
+
+        assert_eq!(
+            pushdown_rows_matched, 4,
+            "Expected 4 rows to pass both predicates"
+        );
+        assert_eq!(
+            pushdown_rows_pruned, 6,
+            "Expected 6 rows to be pruned (4 by first predicate + 2 by second 
predicate)"
+        );
+
+        // The sum should equal the total number of rows
+        assert_eq!(
+            pushdown_rows_matched + pushdown_rows_pruned,
+            10,
+            "matched + pruned should equal total rows"
+        );
+    }
+
     #[tokio::test]
     async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
         // batch1: c1(string)
diff --git a/datafusion/datasource-parquet/src/row_filter.rs 
b/datafusion/datasource-parquet/src/row_filter.rs
index 660b32f486..de9fe181f8 100644
--- a/datafusion/datasource-parquet/src/row_filter.rs
+++ b/datafusion/datasource-parquet/src/row_filter.rs
@@ -470,14 +470,33 @@ pub fn build_row_filter(
         });
     }
 
+    // To avoid double-counting metrics when multiple predicates are used:
+    // - All predicates should count rows_pruned (cumulative pruned rows)
+    // - Only the last predicate should count rows_matched (final result)
+    // This ensures: rows_matched + rows_pruned = total rows processed
+    let total_candidates = candidates.len();
+
     candidates
         .into_iter()
-        .map(|candidate| {
+        .enumerate()
+        .map(|(idx, candidate)| {
+            let is_last = idx == total_candidates - 1;
+
+            // All predicates share the pruned counter (cumulative)
+            let predicate_rows_pruned = rows_pruned.clone();
+
+            // Only the last predicate tracks matched rows (final result)
+            let predicate_rows_matched = if is_last {
+                rows_matched.clone()
+            } else {
+                metrics::Count::new()
+            };
+
             DatafusionArrowPredicate::try_new(
                 candidate,
                 metadata,
-                rows_pruned.clone(),
-                rows_matched.clone(),
+                predicate_rows_pruned,
+                predicate_rows_matched,
                 time.clone(),
             )
             .map(|pred| Box::new(pred) as _)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to