This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f1e5c94f3a Compute Dynamic Filters only when a consumer supports them 
(#19546)
f1e5c94f3a is described below

commit f1e5c94f3ab3722c15984408ae34cae82a216665
Author: Lía Adriana <[email protected]>
AuthorDate: Wed Dec 31 02:04:33 2025 +0100

    Compute Dynamic Filters only when a consumer supports them (#19546)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    Closes https://github.com/apache/datafusion/issues/17527
    
    ## Rationale for this change
    
    Currently, DataFusion computes bounds for all queries that contain a
    HashJoinExec node whenever the option enable_dynamic_filter_pushdown is
    set to true (default). It might make sense to compute these bounds only
    when we explicitly know there is a consumer that will use them.
    
    ## What changes are included in this PR?
    
    As suggested in
    https://github.com/apache/datafusion/issues/17527#issuecomment-3576945224,
    this PR adds an is_used() method to DynamicFilterPhysicalExpr that
    checks if any consumers are holding a reference to the filter using
    Arc::strong_count().
    
    During filter pushdown, consumers that accept the filter and use it
    later in execution have to retain a reference to Arc. For example, scan
    nodes like ParquetSource.
    
    ## Are these changes tested?
    
    I added a unit test in dynamic_filters.rs (test_is_used) that verifies
    the Arc reference counting behavior.
    Existing integration tests in
    datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs validate
    the end-to-end behavior. These tests verify that dynamic filters are
    computed and filled when consumers are present.
    
    
    
    ## Are there any user-facing changes?
    
    new is_used() function
---
 .../physical_optimizer/filter_pushdown/mod.rs      | 88 ++++++++++++++++++++++
 .../src/expressions/dynamic_filters.rs             | 66 +++++++++++++++-
 .../physical-plan/src/joins/hash_join/exec.rs      | 37 ++++++++-
 3 files changed, 188 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index f480de71d6..d6357fdf6b 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -3512,3 +3512,91 @@ async fn 
test_hashjoin_hash_table_pushdown_integer_keys() {
     ",
     );
 }
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_pushdown_is_used() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Test both cases: probe side with and without filter pushdown support
+    for (probe_supports_pushdown, expected_is_used) in [(false, false), (true, 
true)] {
+        let build_side_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+        ]));
+        let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+            .with_support(true)
+            .with_batches(vec![
+                record_batch!(("a", Utf8, ["aa", "ab"]), ("b", Utf8, ["ba", 
"bb"]))
+                    .unwrap(),
+            ])
+            .build();
+
+        let probe_side_schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Utf8, false),
+        ]));
+        let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+            .with_support(probe_supports_pushdown)
+            .with_batches(vec![
+                record_batch!(
+                    ("a", Utf8, ["aa", "ab", "ac", "ad"]),
+                    ("b", Utf8, ["ba", "bb", "bc", "bd"])
+                )
+                .unwrap(),
+            ])
+            .build();
+
+        let on = vec![
+            (
+                col("a", &build_side_schema).unwrap(),
+                col("a", &probe_side_schema).unwrap(),
+            ),
+            (
+                col("b", &build_side_schema).unwrap(),
+                col("b", &probe_side_schema).unwrap(),
+            ),
+        ];
+        let plan = Arc::new(
+            HashJoinExec::try_new(
+                build_scan,
+                probe_scan,
+                on,
+                None,
+                &JoinType::Inner,
+                None,
+                PartitionMode::CollectLeft,
+                datafusion_common::NullEquality::NullEqualsNothing,
+            )
+            .unwrap(),
+        ) as Arc<dyn ExecutionPlan>;
+
+        // Apply filter pushdown optimization
+        let mut config = ConfigOptions::default();
+        config.execution.parquet.pushdown_filters = true;
+        config.optimizer.enable_dynamic_filter_pushdown = true;
+        let plan = FilterPushdown::new_post_optimization()
+            .optimize(plan, &config)
+            .unwrap();
+
+        // Get the HashJoinExec to check the dynamic filter
+        let hash_join = plan
+            .as_any()
+            .downcast_ref::<HashJoinExec>()
+            .expect("Plan should be HashJoinExec");
+
+        // Verify that a dynamic filter was created
+        let dynamic_filter = hash_join
+            .dynamic_filter_for_test()
+            .expect("Dynamic filter should be created");
+
+        // Verify that is_used() returns the expected value based on probe 
side support.
+        // When probe_supports_pushdown=false: no consumer holds a reference 
(is_used=false)
+        // When probe_supports_pushdown=true: probe side holds a reference 
(is_used=true)
+        assert_eq!(
+            dynamic_filter.is_used(),
+            expected_is_used,
+            "is_used() should return {expected_is_used} when probe side 
support is {probe_supports_pushdown}"
+        );
+    }
+}
diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs 
b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
index 615d9cbbf6..fd8b266725 100644
--- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs
+++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
@@ -278,12 +278,17 @@ impl DynamicFilterPhysicalExpr {
 
     /// Wait asynchronously until this dynamic filter is marked as complete.
     ///
-    /// This method returns immediately if the filter is already complete.
+    /// This method returns immediately if the filter is already complete or 
if the filter
+    /// is not being used by any consumers.
     /// Otherwise, it waits until [`Self::mark_complete`] is called.
     ///
     /// Unlike [`Self::wait_update`], this method guarantees that when it 
returns,
     /// the filter is fully complete with no more updates expected.
-    pub async fn wait_complete(&self) {
+    pub async fn wait_complete(self: &Arc<Self>) {
+        if !self.is_used() {
+            return;
+        }
+
         if self.inner.read().is_complete {
             return;
         }
@@ -294,6 +299,22 @@ impl DynamicFilterPhysicalExpr {
             .await;
     }
 
+    /// Check if this dynamic filter is being actively used by any consumers.
+    ///
+    /// Returns `true` if there are references beyond the producer (e.g., the 
HashJoinExec
+    /// that created the filter). This is useful to avoid computing expensive 
filter
+    /// expressions when no consumer will actually use them.
+    ///
+    /// Note: We check the inner Arc's strong_count, not the outer Arc's 
count, because
+    /// when filters are transformed (e.g., via reassign_expr_columns during 
filter pushdown),
+    /// new outer Arc instances are created via with_new_children(), but they 
all share the
+    /// same inner `Arc<RwLock<Inner>>`. This is what allows filter updates to 
propagate to
+    /// consumers even after transformation.
+    pub fn is_used(self: &Arc<Self>) -> bool {
+        // Strong count > 1 means at least one consumer is holding a reference 
beyond the producer.
+        Arc::strong_count(&self.inner) > 1
+    }
+
     fn render(
         &self,
         f: &mut std::fmt::Formatter<'_>,
@@ -691,4 +712,45 @@ mod test {
             "Expected b + d = [1010, 2020, 3030], got {arr_2:?}",
         );
     }
+
+    #[test]
+    fn test_is_used() {
+        let filter = Arc::new(DynamicFilterPhysicalExpr::new(
+            vec![],
+            lit(true) as Arc<dyn PhysicalExpr>,
+        ));
+
+        // Initially, only one reference to the inner Arc exists
+        assert!(
+            !filter.is_used(),
+            "Filter should not be used with only one inner reference"
+        );
+
+        // Simulate a consumer created via transformation (what happens during 
filter pushdown).
+        // When filters are pushed down and transformed via 
reassign_expr_columns/transform_down,
+        // with_new_children() is called which creates a new outer Arc but 
clones the inner Arc.
+        let consumer1_expr = 
Arc::clone(&filter).with_new_children(vec![]).unwrap();
+        let _consumer1 = consumer1_expr
+            .as_any()
+            .downcast_ref::<DynamicFilterPhysicalExpr>()
+            .expect("Should be DynamicFilterPhysicalExpr");
+
+        // Now the inner Arc is shared (inner_count = 2)
+        assert!(
+            filter.is_used(),
+            "Filter should be used when inner Arc is shared with transformed 
consumer"
+        );
+
+        // Create another transformed consumer
+        let consumer2_expr = 
Arc::clone(&filter).with_new_children(vec![]).unwrap();
+        let _consumer2 = consumer2_expr
+            .as_any()
+            .downcast_ref::<DynamicFilterPhysicalExpr>()
+            .expect("Should be DynamicFilterPhysicalExpr");
+
+        assert!(
+            filter.is_used(),
+            "Filter should still be used with multiple consumers"
+        );
+    }
 }
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index bd92cf4964..91fc1ee443 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -508,6 +508,17 @@ impl HashJoinExec {
         self.null_equality
     }
 
+    /// Get the dynamic filter expression for testing purposes.
+    /// Returns `None` if no dynamic filter has been set.
+    ///
+    /// This method is intended for testing only and should not be used in 
production code.
+    #[doc(hidden)]
+    pub fn dynamic_filter_for_test(&self) -> 
Option<Arc<DynamicFilterPhysicalExpr>> {
+        self.dynamic_filter
+            .as_ref()
+            .map(|df| Arc::clone(&df.filter))
+    }
+
     /// Calculate order preservation flags for this hash join.
     fn maintains_input_order(join_type: JoinType) -> Vec<bool> {
         vec![
@@ -921,7 +932,21 @@ impl ExecutionPlan for HashJoinExec {
              consider using CoalescePartitionsExec or the EnforceDistribution 
rule"
         );
 
-        let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
+        // Only enable dynamic filter pushdown if:
+        // - The session config enables dynamic filter pushdown
+        // - A dynamic filter exists
+        // - At least one consumer is holding a reference to it, this avoids 
expensive filter
+        //   computation when disabled or when no consumer will use it.
+        let enable_dynamic_filter_pushdown = context
+            .session_config()
+            .options()
+            .optimizer
+            .enable_join_dynamic_filter_pushdown
+            && self
+                .dynamic_filter
+                .as_ref()
+                .map(|df| df.filter.is_used())
+                .unwrap_or(false);
 
         let join_metrics = BuildProbeJoinMetrics::new(partition, 
&self.metrics);
         let left_fut = match self.mode {
@@ -4610,6 +4635,11 @@ mod tests {
         let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
         let dynamic_filter_clone = Arc::clone(&dynamic_filter);
 
+        // Simulate a consumer by creating a transformed copy (what happens 
during filter pushdown)
+        let _consumer = Arc::clone(&dynamic_filter)
+            .with_new_children(vec![])
+            .unwrap();
+
         // Create HashJoinExec with the dynamic filter
         let mut join = HashJoinExec::try_new(
             left,
@@ -4658,6 +4688,11 @@ mod tests {
         let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
         let dynamic_filter_clone = Arc::clone(&dynamic_filter);
 
+        // Simulate a consumer by creating a transformed copy (what happens 
during filter pushdown)
+        let _consumer = Arc::clone(&dynamic_filter)
+            .with_new_children(vec![])
+            .unwrap();
+
         // Create HashJoinExec with the dynamic filter
         let mut join = HashJoinExec::try_new(
             left,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to