This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bef1368c33 Simplify wait_complete function (#19937)
bef1368c33 is described below

commit bef1368c3307eab0edf0c3a4b80f94d2840bb1c1
Author: Lía Adriana <[email protected]>
AuthorDate: Tue Jan 27 22:13:48 2026 +0100

    Simplify wait_complete function (#19937)
    
    ## Which issue does this PR close?
    
    
    ## Rationale for this change
    
    The current v52 signature `pub async fn wait_complete(self: &Arc<Self>)`
    (introduced in #19546) is a bit unergonomic. The method requires
    `&Arc<DynamicFilterPhysicalExpr>`, but when working with `Arc<dyn
    PhysicalExpr>`, downcasting only gives you `&DynamicFilterPhysicalExpr`.
    Since you can't convert `&DynamicFilterPhysicalExpr` to
    `Arc<DynamicFilterPhysicalExpr>`, the method becomes impossible to call.
    
    
    The `&Arc<Self>` param was used to check` is_used()` via Arc strong
    count, but this was overly defensive.
    
    ## What changes are included in this PR?
    
    - Changed `DynamicFilterPhysicalExpr::wait_complete` signature from `pub
    async fn wait_complete(self: &Arc<Self>)` to `pub async fn
    wait_complete(&self)`.
    
    - Removed the `is_used()` check from `wait_complete()` - this method,
    like `wait_update()`, should only be called on filters that have
    consumers. If the caller doesn't know whether the filter has consumers,
    they should call `is_used()` first to avoid waiting indefinitely. This
    approach avoids complex signatures and dependencies between the APIs
    methods.
    
    ## Are these changes tested?
    
    Yes, existing tests cover this functionality, I removed the "mock"
    consumer from `test_hash_join_marks_filter_complete_empty_build_side`
    and `test_hash_join_marks_filter_complete` since the fix in
    https://github.com/apache/datafusion/pull/19734 makes is_used check the
    outer struct `strong_count` as well.
    
    
    ## Are there any user-facing changes?
    
    The signature of `wait_complete` changed.
---
 .../physical-expr/src/expressions/dynamic_filters.rs    | 17 ++++++++++-------
 datafusion/physical-plan/src/joins/hash_join/exec.rs    | 10 ----------
 2 files changed, 10 insertions(+), 17 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs 
b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
index 6c961e3bb0..d285f8b377 100644
--- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs
+++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
@@ -276,6 +276,10 @@ impl DynamicFilterPhysicalExpr {
     ///
     /// This method will return when [`Self::update`] is called and the 
generation increases.
     /// It does not guarantee that the filter is complete.
+    ///
+    /// Producers (e.g.) HashJoinExec may never update the expression or mark 
it as completed if there are no consumers.
+    /// If you call this method on a dynamic filter created by such a producer 
and there are no consumers registered this method would wait indefinitely.
+    /// This should not happen under normal operation and would indicate a 
programming error either in your producer or in DataFusion if the producer is a 
built in node.
     pub async fn wait_update(&self) {
         let mut rx = self.state_watch.subscribe();
         // Get the current generation
@@ -287,17 +291,16 @@ impl DynamicFilterPhysicalExpr {
 
     /// Wait asynchronously until this dynamic filter is marked as complete.
     ///
-    /// This method returns immediately if the filter is already complete or 
if the filter
-    /// is not being used by any consumers.
+    /// This method returns immediately if the filter is already complete.
     /// Otherwise, it waits until [`Self::mark_complete`] is called.
     ///
     /// Unlike [`Self::wait_update`], this method guarantees that when it 
returns,
     /// the filter is fully complete with no more updates expected.
-    pub async fn wait_complete(self: &Arc<Self>) {
-        if !self.is_used() {
-            return;
-        }
-
+    ///
+    /// Producers (e.g.) HashJoinExec may never update the expression or mark 
it as completed if there are no consumers.
+    /// If you call this method on a dynamic filter created by such a producer 
and there are no consumers registered this method would wait indefinitely.
+    /// This should not happen under normal operation and would indicate a 
programming error either in your producer or in DataFusion if the producer is a 
built in node.
+    pub async fn wait_complete(&self) {
         if self.inner.read().is_complete {
             return;
         }
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 131b07461e..c249dfb10a 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -5078,11 +5078,6 @@ mod tests {
         let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
         let dynamic_filter_clone = Arc::clone(&dynamic_filter);
 
-        // Simulate a consumer by creating a transformed copy (what happens 
during filter pushdown)
-        let _consumer = Arc::clone(&dynamic_filter)
-            .with_new_children(vec![])
-            .unwrap();
-
         // Create HashJoinExec with the dynamic filter
         let mut join = HashJoinExec::try_new(
             left,
@@ -5132,11 +5127,6 @@ mod tests {
         let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
         let dynamic_filter_clone = Arc::clone(&dynamic_filter);
 
-        // Simulate a consumer by creating a transformed copy (what happens 
during filter pushdown)
-        let _consumer = Arc::clone(&dynamic_filter)
-            .with_new_children(vec![])
-            .unwrap();
-
         // Create HashJoinExec with the dynamic filter
         let mut join = HashJoinExec::try_new(
             left,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to