gabotechs commented on code in PR #20416:
URL: https://github.com/apache/datafusion/pull/20416#discussion_r2851286605


##########
datafusion/proto/proto/datafusion.proto:
##########
@@ -860,6 +860,12 @@ message PhysicalExprNode {
   // across serde roundtrips.
   optional uint64 expr_id = 30;
 
+  // For DynamicFilterPhysicalExpr, this identifies the shared inner state.
+  // Multiple expressions may have different expr_id values (different outer 
Arc wrappers)
+  // but the same dynamic_filter_inner_id (shared inner state).
+  // Used to reconstruct shared inner state during deserialization.
+  optional uint64 dynamic_filter_inner_id = 31;
+

Review Comment:
   I think we should find ways of not leaking this detail here.



##########
datafusion/proto/src/physical_plan/mod.rs:
##########
@@ -3918,24 +3936,51 @@ impl PhysicalProtoConverterExtension for 
DeduplicatingDeserializer {
     where
         Self: Sized,
     {
-        if let Some(expr_id) = proto.expr_id {
-            // Check cache first
-            if let Some(cached) = self.cache.borrow().get(&expr_id) {
-                return Ok(Arc::clone(cached));
+        // The entire expr is cached, so re-use it.
+        if let Some(expr_id) = proto.expr_id
+            && let Some(cached) = self.cache.borrow().get(&expr_id)
+        {
+            return Ok(Arc::clone(cached));
+        }
+
+        // Cache miss, we must deserialize the expr.
+        let mut expr =
+            parse_physical_expr_with_converter(proto, ctx, input_schema, 
codec, self)?;
+
+        // Check if we need to share inner state with a cached dynamic filter
+        if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id {

Review Comment:
   The amount of special-casing for handling dynamic filters in the protobuf 
code seems to big in this PR.
   
   The fact that dynamic filters are in a situation where they claim to be 
normal `PhysicalExpr` but they anyway need special treatment in several parts 
of the codebase makes me think that there might be better ways of approaching 
things in general. 



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -327,6 +468,14 @@ impl DynamicFilterPhysicalExpr {
         Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1
     }
 
+    /// Returns a unique identifier for the inner shared state.
+    ///
+    /// Useful for checking if two [`Arc<PhysicalExpr>`] with the same
+    /// underlying [`DynamicFilterPhysicalExpr`] are the same.
+    pub fn inner_id(&self) -> u64 {
+        Arc::as_ptr(&self.inner) as *const () as u64

Review Comment:
   Overall, I think the current way dynamic filters are playing with raw 
pointer addresses is a bit like playing with fire.
   
   For example:
   - The `is_used()` method returns bool if there is a strong count greater 
than one in any of their inner or outer arcs, but you might perfectly have a 
strong count greater than one just because of how you happen to lay out your 
Rust code, and the filter might still be unused (e.g., in optimization passes, 
the strong count can be 6 or 7 easily, and the filter is still unused)
   - Choosing a unique identifier of a dynamic filter based on a raw pointer 
address u64 representation forces you to perform calls to the operating system 
in serialization code (`std::process::id()`) that would not be necessary with a 
proper dynamic filter id.
   
   My impression is: the usage of raw pointers addresses as proxy to 
business-logic details in dynamic filters has gone too far, and we should be 
using proper ids instead.



##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -256,6 +258,47 @@ pub fn serialize_physical_expr_with_converter(
     codec: &dyn PhysicalExtensionCodec,
     proto_converter: &dyn PhysicalProtoConverterExtension,
 ) -> Result<protobuf::PhysicalExprNode> {
+    // Check for DynamicFilterPhysicalExpr before snapshotting.
+    // We need to handle it before snapshot_physical_expr because snapshot()
+    // replaces the DynamicFilterPhysicalExpr with its inner expression.
+    if let Some(df) = 
value.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {

Review Comment:
   I see one of the main issues is how snapshotting works with dynamic 
filtering.
   
   In the physical expression trait:
   
   ```rust
       ...
       /// A system or function that can only deal with a hardcoded set of 
`PhysicalExpr` implementations
       /// or needs to serialize this state to bytes may not be able to handle 
these dynamic references.
       /// In such cases, we should return a simplified version of the 
`PhysicalExpr` that does not
       /// contain these dynamic references.
       ...
       fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>>;
   ```
   
   This doesn't return a snapshot of the `DynamicFilterPhysicalExpr`, it 
returns a snapshot to the inner computed physical expression, losing all the 
information of the original  `DynamicFilterPhysicalExpr`.
   
   What we want in this PR, is that what we serialize is the actual 
`DynamicFilterPhysicalExpr`, with all it's details, so probably ` 
snapshot_physical_expr(Arc::clone(value))?;` should not be getting called at 
all for any expression, and we should be able to treat the dynamic filter as 
just another serializable `PhysicalExpr` 
   



##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -180,18 +291,52 @@ impl DynamicFilterPhysicalExpr {
         }
     }
 
+    /// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it 
overwrites the
+    /// internal state with the source filter's state.
+    ///
+    /// This is a low-level API intended for use by the proto deserialization 
layer.
+    ///
+    /// # Safety

Review Comment:
   Safety comments in Rust are typically given for when the function has an 
`unsafe` block, and the dev needs to explain what are the implicit safety 
constraints for users to use that function.
   
   As this function is not `unsafe`, I'd suggest just choosing a different word.



##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -256,6 +258,47 @@ pub fn serialize_physical_expr_with_converter(
     codec: &dyn PhysicalExtensionCodec,
     proto_converter: &dyn PhysicalProtoConverterExtension,
 ) -> Result<protobuf::PhysicalExprNode> {
+    // Check for DynamicFilterPhysicalExpr before snapshotting.
+    // We need to handle it before snapshot_physical_expr because snapshot()
+    // replaces the DynamicFilterPhysicalExpr with its inner expression.
+    if let Some(df) = 
value.as_any().downcast_ref::<DynamicFilterPhysicalExpr>() {

Review Comment:
   Doubling down on the above, the fact that dynamic filters cannot be treated 
as normal expressions makes me think that there might be better ways of dealing 
with them.
   
   Maybe there's an opportunity for making them normal expressions again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to