This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bef1368c33 Simplify wait_complete function (#19937)
bef1368c33 is described below
commit bef1368c3307eab0edf0c3a4b80f94d2840bb1c1
Author: Lía Adriana <[email protected]>
AuthorDate: Tue Jan 27 22:13:48 2026 +0100
Simplify wait_complete function (#19937)
## Which issue does this PR close?
## Rationale for this change
The current v52 signature `pub async fn wait_complete(self: &Arc<Self>)`
(introduced in #19546) is a bit unergonomic. The method requires
`&Arc<DynamicFilterPhysicalExpr>`, but when working with `Arc<dyn
PhysicalExpr>`, downcasting only gives you `&DynamicFilterPhysicalExpr`.
Since you can't convert `&DynamicFilterPhysicalExpr` to
`Arc<DynamicFilterPhysicalExpr>`, the method becomes impossible to call.
The `&Arc<Self>` param was used to check` is_used()` via Arc strong
count, but this was overly defensive.
## What changes are included in this PR?
- Changed `DynamicFilterPhysicalExpr::wait_complete` signature from `pub
async fn wait_complete(self: &Arc<Self>)` to `pub async fn
wait_complete(&self)`.
- Removed the `is_used()` check from `wait_complete()` - this method,
like `wait_update()`, should only be called on filters that have
consumers. If the caller doesn't know whether the filter has consumers,
they should call `is_used()` first to avoid waiting indefinitely. This
approach avoids complex signatures and dependencies between the APIs
methods.
## Are these changes tested?
Yes, existing tests cover this functionality, I removed the "mock"
consumer from `test_hash_join_marks_filter_complete_empty_build_side`
and `test_hash_join_marks_filter_complete` since the fix in
https://github.com/apache/datafusion/pull/19734 makes is_used check the
outer struct `strong_count` as well.
## Are there any user-facing changes?
The signature of `wait_complete` changed.
---
.../physical-expr/src/expressions/dynamic_filters.rs | 17 ++++++++++-------
datafusion/physical-plan/src/joins/hash_join/exec.rs | 10 ----------
2 files changed, 10 insertions(+), 17 deletions(-)
diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs
b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
index 6c961e3bb0..d285f8b377 100644
--- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs
+++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
@@ -276,6 +276,10 @@ impl DynamicFilterPhysicalExpr {
///
/// This method will return when [`Self::update`] is called and the
generation increases.
/// It does not guarantee that the filter is complete.
+ ///
+ /// Producers (e.g.) HashJoinExec may never update the expression or mark
it as completed if there are no consumers.
+ /// If you call this method on a dynamic filter created by such a producer
and there are no consumers registered this method would wait indefinitely.
+ /// This should not happen under normal operation and would indicate a
programming error either in your producer or in DataFusion if the producer is a
built in node.
pub async fn wait_update(&self) {
let mut rx = self.state_watch.subscribe();
// Get the current generation
@@ -287,17 +291,16 @@ impl DynamicFilterPhysicalExpr {
/// Wait asynchronously until this dynamic filter is marked as complete.
///
- /// This method returns immediately if the filter is already complete or
if the filter
- /// is not being used by any consumers.
+ /// This method returns immediately if the filter is already complete.
/// Otherwise, it waits until [`Self::mark_complete`] is called.
///
/// Unlike [`Self::wait_update`], this method guarantees that when it
returns,
/// the filter is fully complete with no more updates expected.
- pub async fn wait_complete(self: &Arc<Self>) {
- if !self.is_used() {
- return;
- }
-
+ ///
+ /// Producers (e.g.) HashJoinExec may never update the expression or mark
it as completed if there are no consumers.
+ /// If you call this method on a dynamic filter created by such a producer
and there are no consumers registered this method would wait indefinitely.
+ /// This should not happen under normal operation and would indicate a
programming error either in your producer or in DataFusion if the producer is a
built in node.
+ pub async fn wait_complete(&self) {
if self.inner.read().is_complete {
return;
}
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 131b07461e..c249dfb10a 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -5078,11 +5078,6 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
- // Simulate a consumer by creating a transformed copy (what happens
during filter pushdown)
- let _consumer = Arc::clone(&dynamic_filter)
- .with_new_children(vec![])
- .unwrap();
-
// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
@@ -5132,11 +5127,6 @@ mod tests {
let dynamic_filter = HashJoinExec::create_dynamic_filter(&on);
let dynamic_filter_clone = Arc::clone(&dynamic_filter);
- // Simulate a consumer by creating a transformed copy (what happens
during filter pushdown)
- let _consumer = Arc::clone(&dynamic_filter)
- .with_new_children(vec![])
- .unwrap();
-
// Create HashJoinExec with the dynamic filter
let mut join = HashJoinExec::try_new(
left,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]