This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 69eca52102 [branch-52] fix: DynamicFilterPhysicalExpr violates Hash/Eq 
contract (#19659) (#19705)
69eca52102 is described below

commit 69eca52102af32517aeca3c97805b29296b5eae7
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 9 04:14:54 2026 -0500

    [branch-52] fix: DynamicFilterPhysicalExpr violates Hash/Eq contract 
(#19659) (#19705)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - part of https://github.com/apache/datafusion/issues/18566
    
    ## Rationale for this change
    
    I propose back porting the fix for
    https://github.com/apache/datafusion/issues/19641 to 52 release
    
    ## What changes are included in this PR?
    
    - Backport https://github.com/apache/datafusion/pull/19659
    
    ## Are these changes tested?
    eYes
    
    ## Are there any user-facing changes?
    
    bug fix
    
    Co-authored-by: Kumar Ujjawal <[email protected]>
---
 .../src/expressions/dynamic_filters.rs             | 123 +++++++++++++++++++--
 1 file changed, 115 insertions(+), 8 deletions(-)

diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs 
b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
index fd8b266725..7703d201aa 100644
--- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs
+++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs
@@ -26,7 +26,7 @@ use datafusion_common::{
     tree_node::{Transformed, TransformedResult, TreeNode},
 };
 use datafusion_expr::ColumnarValue;
-use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};
+use datafusion_physical_expr_common::physical_expr::DynHash;
 
 /// State of a dynamic filter, tracking both updates and completion.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -103,8 +103,11 @@ impl Inner {
 
 impl Hash for DynamicFilterPhysicalExpr {
     fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        let inner = self.current().expect("Failed to get current expression");
-        inner.dyn_hash(state);
+        // Use pointer identity of the inner Arc for stable hashing.
+        // This is stable across update() calls and consistent with Eq.
+        // See issue #19641 for details on why content-based hashing violates
+        // the Hash/Eq contract when the underlying expression can change.
+        Arc::as_ptr(&self.inner).hash(state);
         self.children.dyn_hash(state);
         self.remapped_children.dyn_hash(state);
     }
@@ -112,11 +115,13 @@ impl Hash for DynamicFilterPhysicalExpr {
 
 impl PartialEq for DynamicFilterPhysicalExpr {
     fn eq(&self, other: &Self) -> bool {
-        let inner = self.current().expect("Failed to get current expression");
-        let our_children = 
self.remapped_children.as_ref().unwrap_or(&self.children);
-        let other_children = 
other.remapped_children.as_ref().unwrap_or(&other.children);
-        let other = other.current().expect("Failed to get current expression");
-        inner.dyn_eq(other.as_any()) && our_children == other_children
+        // Two dynamic filters are equal if they share the same inner source
+        // AND have the same children configuration.
+        // This is consistent with Hash using Arc::as_ptr.
+        // See issue #19641 for details on the Hash/Eq contract violation fix.
+        Arc::ptr_eq(&self.inner, &other.inner)
+            && self.children == other.children
+            && self.remapped_children == other.remapped_children
     }
 }
 
@@ -753,4 +758,106 @@ mod test {
             "Filter should still be used with multiple consumers"
         );
     }
+
+    /// Test that verifies the Hash/Eq contract is now satisfied (issue #19641 
fix).
+    ///
+    /// After the fix, Hash uses Arc::as_ptr(&self.inner) which is stable 
across
+    /// update() calls, fixing the HashMap key instability issue.
+    #[test]
+    fn test_hash_stable_after_update() {
+        use std::collections::hash_map::DefaultHasher;
+        use std::hash::{Hash, Hasher};
+
+        // Create filter with initial value
+        let filter =
+            DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn 
PhysicalExpr>);
+
+        // Compute hash BEFORE update
+        let mut hasher_before = DefaultHasher::new();
+        filter.hash(&mut hasher_before);
+        let hash_before = hasher_before.finish();
+
+        // Update changes the underlying expression
+        filter
+            .update(lit(false) as Arc<dyn PhysicalExpr>)
+            .expect("Update should succeed");
+
+        // Compute hash AFTER update
+        let mut hasher_after = DefaultHasher::new();
+        filter.hash(&mut hasher_after);
+        let hash_after = hasher_after.finish();
+
+        // FIXED: Hash should now be STABLE after update() because we use
+        // Arc::as_ptr for identity-based hashing instead of expression 
content.
+        assert_eq!(
+            hash_before, hash_after,
+            "Hash should be stable after update() - fix for issue #19641"
+        );
+
+        // Self-equality should still hold
+        assert!(filter.eq(&filter), "Self-equality should hold");
+    }
+
+    /// Test that verifies separate DynamicFilterPhysicalExpr instances
+    /// with the same expression are NOT equal (identity-based comparison).
+    #[test]
+    fn test_identity_based_equality() {
+        // Create two separate filters with identical initial expressions
+        let filter1 =
+            DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn 
PhysicalExpr>);
+        let filter2 =
+            DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn 
PhysicalExpr>);
+
+        // Different instances should NOT be equal even with same expression
+        // because they have independent inner Arcs (different update 
lifecycles)
+        assert!(
+            !filter1.eq(&filter2),
+            "Different instances should not be equal (identity-based)"
+        );
+
+        // Self-equality should hold
+        assert!(filter1.eq(&filter1), "Self-equality should hold");
+    }
+
+    /// Test that hash is stable for the same filter instance.
+    /// After the fix, hash uses Arc::as_ptr which is pointer-based.
+    #[test]
+    fn test_hash_stable_for_same_instance() {
+        use std::collections::hash_map::DefaultHasher;
+        use std::hash::{Hash, Hasher};
+
+        let filter =
+            DynamicFilterPhysicalExpr::new(vec![], lit(true) as Arc<dyn 
PhysicalExpr>);
+
+        // Compute hash twice for the same instance
+        let hash1 = {
+            let mut h = DefaultHasher::new();
+            filter.hash(&mut h);
+            h.finish()
+        };
+        let hash2 = {
+            let mut h = DefaultHasher::new();
+            filter.hash(&mut h);
+            h.finish()
+        };
+
+        assert_eq!(hash1, hash2, "Same instance should have stable hash");
+
+        // Update the expression
+        filter
+            .update(lit(false) as Arc<dyn PhysicalExpr>)
+            .expect("Update should succeed");
+
+        // Hash should STILL be the same (identity-based)
+        let hash3 = {
+            let mut h = DefaultHasher::new();
+            filter.hash(&mut h);
+            h.finish()
+        };
+
+        assert_eq!(
+            hash1, hash3,
+            "Hash should be stable after update (identity-based)"
+        );
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to