LiaCastaneda commented on code in PR #18799:
URL: https://github.com/apache/datafusion/pull/18799#discussion_r2541475425
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -204,13 +229,68 @@ impl DynamicFilterPhysicalExpr {
// Load the current inner, increment generation, and store the new one
let mut current = self.inner.write();
+ let new_generation = current.generation + 1;
*current = Inner {
- generation: current.generation + 1,
+ generation: new_generation,
expr: new_expr,
+ is_complete: current.is_complete,
};
+ drop(current); // Release the lock before broadcasting
+
+ // Broadcast the new state to all waiters
+ let _ = self.state_watch.send(FilterState::InProgress {
+ generation: new_generation,
+ });
Ok(())
}
+ /// Mark this dynamic filter as complete and broadcast to all waiters.
+ ///
+ /// This signals that all expected updates have been received.
+ /// Waiters using [`Self::wait_complete`] will be notified.
+ pub fn mark_complete(&self) {
+ let mut current = self.inner.write();
+ let current_generation = current.generation;
+ current.is_complete = true;
+ drop(current);
+
+ // Broadcast completion to all waiters
+ let _ = self.state_watch.send(FilterState::Complete {
+ generation: current_generation,
+ });
+ }
+
+ /// Wait asynchronously for any update to this filter.
+ ///
+ /// This method will return when [`Self::update`] is called and the
generation increases.
+ /// It does not guarantee that the filter is complete.
+ pub async fn wait_update(&self) {
+ let mut rx = self.state_watch.subscribe();
+ // Get the current generation
+ let current_gen = rx.borrow_and_update().generation();
+
+ // Wait until generation increases
+ let _ = rx.wait_for(|state| state.generation() > current_gen).await;
+ }
+
+ /// Wait asynchronously until this dynamic filter is marked as complete.
+ ///
+ /// This method returns immediately if the filter is already complete.
+ /// Otherwise, it waits until [`Self::mark_complete`] is called.
+ ///
+ /// Unlike [`Self::wait_update`], this method guarantees that when it
returns,
+ /// the filter is fully complete with no more updates expected.
+ pub async fn wait_complete(&self) {
+ if self.inner.read().is_complete {
Review Comment:
This is redundant with the watch channel state, but allows us to return
immediately, from `wait_complete()` without subscribing if already complete.
Otherwise `wait_complete` could be indefinetely locked in cases like:
```
async fn test_wait_complete_already_complete() {
let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
vec![],
lit(42) as Arc<dyn PhysicalExpr>,
));
// Mark as complete immediately
dynamic_filter.mark_complete();
dynamic_filter.wait_complete().await; // It could get indefinetely
locked here
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]