Re: [PR] dynamic filter refactor [datafusion]

2025-06-25 Thread via GitHub


github-actions[bot] closed pull request #15685: dynamic filter refactor
URL: https://github.com/apache/datafusion/pull/15685


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-06-17 Thread via GitHub


github-actions[bot] commented on PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2982375299

   Thank you for your contribution. Unfortunately, this pull request is stale 
because it has been open 60 days with no activity. Please remove the stale 
label or comment or this will be closed in 7 days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-17 Thread via GitHub


berkaysynnada commented on PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2813385207

   Blockers are gone. I think we can focus on this now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2798502900

   Hey @jayzhan211 thank you for putting the work into trying to clarify this.
   
   At this point I think it would be best to wait for #15566 or a PR that 
replaces it to be merged so that we can work against an actual use case / 
implementation of these dynamic filters. Otherwise I think it's a bit hard to 
communicate in such abstract terms. Once we're looking at a concrete use case 
it will be easier to make a PR to replace this implementation.
   
   Would it be okay with you to wait until that happens to continue this 
discussion?
   
   Sorry if merging a PR with a bad implementation becomes problematic... 
luckily it's problematic for us, not end users, since this is all private 
implementations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2798405570

   https://github.com/apache/datafusion/pull/15568#discussion_r2038773841
   
   # why the change is equivalent to your in the high level idea.
   
   > 1. DynamicFilterPhysicalExpr gets initialized at planning time with a 
known set of children but a placeholder expression (lit(true))
   
   The same.
   
   > 2. with_new_children is called making a new DynamicFilterPhysicalExpr but 
with the children replaced (let's ignore how that happens internally for now)
   
   We need to replace children, and we can achieve this and get the result by 
`snapshot`
   
   > 3. update is called on the original reference with an expression that 
references the original children. This is propagated to all references, 
including those with new children, because of the Arc>.
   
   We can keep `Arc>` and update the inner or even create another new 
source filter.
   
   > 4. evaluate is called on one of the references that previously had 
with_new_children called on it. Since update was called, which swapped out 
inner, the children of this new inner need to be remapped to the children that 
we currently expose externally.
   
   We can call `evaluate` on `snapshot` because `snapshot` is already remapped. 
Your version need to remap for each `evaluate` called, but `snapshot` in my 
version done it once, and we evaluate on it without remap.
   
   # The improvement of this chanage
   1. We have correct `with_new_children` because we update the source filter 
now.
   2. `DynamicFilterPhysicalExpr` is basically `filter expression: Arc>`. We have a simple interface with the same capability now.
   
   # Concern
   1. `reassign_predicate_columns` is replaced by `snapshot` but you think we 
can't do this kind of change because of API churn. I think this is not an issue 
because we are not using in many places.
   2. Do we need Lock for source filter? I think we can create another new 
`DynamicFilterPhysicalExpr` at all. But maybe there is some reasons we can't, 
we can discuss further on this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040512866


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
 )
 })?
 .clone();
-let inner =
-Self::remap_children(&self.children, 
self.remapped_children.as_ref(), inner)?;

Review Comment:
   snapshot is actually an evaluated dynamic filter based on the schema.
   
   Basically, what you have is just the filter expression. You provide the 
schema to remap the column indexes. You get yet another filter expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040512371


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
 )
 })?
 .clone();
-let inner =
-Self::remap_children(&self.children, 
self.remapped_children.as_ref(), inner)?;

Review Comment:
   I think the difference is that
   
   Your version
   1. we have source filter A
   2. create dynamic filter a and b by different filter schema
   3. you create snapshot a, b from them.
   4. evaluate batches by snapshot
   5. update source filter A to B
   6. dynamic filter a, b remap based on source filter B when you call evaluate
   
   My version
   1. we have source filter A
   2. create snapshot based on source filter A + filter schema A and B
   3. evaluate batches by snapshot
   4. update source filter A to B
   5. create snapshot based on source filter B + filter schema A and B
   6. evaluate batches by snapshot



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040510160


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
 /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference 
to it.
 #[derive(Debug)]
 pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a 
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the 
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for 
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can 
re-apply the replacements.
-remapped_schema: Option,
 /// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,

Review Comment:
   > The whole point is that you can create a filter at planning time, bind it 
to a ParquetSource and a SortExec (for example) and then the SortExec can 
dynamically update it at runtime.
   
   instead of sending the filter down, the change I have is sending the filter 
schema down. It is used to create another filter (snapshot) in SortExec 
dynamically at runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040505924


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
 ]));
 // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
 // and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_1 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_2 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));

Review Comment:
   If you really want to keep `reassign_predicate_columns` for whatever reason, 
you should pass the `inner` of `DynamicFilterPhysicalExpr` instead, so you are 
only modifying the `inner` and not the whole `DynamicFilterPhysicalExpr`. The 
difference is that the `with_new_children` is not called on the 
`DynamicFilterPhysicalExpr` level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040504133


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
 ]));
 // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
 // and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_1 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_2 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));

Review Comment:
   > otherwise that's a ton of API churn.
   
   We only use in `DatafusionArrowPredicate`, is there any other places we need 
to change?
   
   The main point is that `with_new_children` in the main branch isn't doing 
the right thing, it should update the source filter instead, but you only 
update the remapped filter schema. I think the filter schema is "parameters" 
for remapping column indexes, it doesn't need to be part of the filter 
expressions at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040471645


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
 ]));
 // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
 // and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_1 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_2 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));

Review Comment:
   > We need reassign_predicate_columns to work, that's what gets called from 
within ParquetSource, etc
   
   We can also change `reassign_predicate_columns` inside Parquet if that bring 
us to the better state. From my view, `reassign_predicate_columns` is the root 
cause why you ends up `with_new_children` that doesn't actually updating the 
"children".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040469037


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
 /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference 
to it.
 #[derive(Debug)]
 pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a 
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the 
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for 
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can 
re-apply the replacements.
-remapped_schema: Option,
 /// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,

Review Comment:
   But how do you pipe the new filter down into other operators?
   
   The whole point is that you can create a filter at planning time, bind it to 
a ParquetSource and a SortExec (for example) and then the SortExec can 
dynamically update it at runtime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040467822


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
 )
 })?
 .clone();
-let inner =
-Self::remap_children(&self.children, 
self.remapped_children.as_ref(), inner)?;

Review Comment:
   I feel like we're going in circles... this is the thing that was expected to 
be used to produce the snapshots... now we're using snapshots to produce it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040466799


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
 ]));
 // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
 // and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_1 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_2 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));

Review Comment:
   Right but the constraint is that we can't modify what 
`reassign_predicate_columns` and similar do internally, otherwise that's a ton 
of API churn. The existing design works within the confines of the existing 
APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040452282


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
 /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference 
to it.
 #[derive(Debug)]
 pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a 
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the 
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for 
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can 
re-apply the replacements.
-remapped_schema: Option,
 /// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,

Review Comment:
   I think we don't need it. Given a source filter, you create snapshot with 
the schema. Then you evaluate based on the remapped filter. When you need a new 
source filter, instead of updating it, just create a new one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040451293


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
 )
 })?
 .clone();
-let inner =
-Self::remap_children(&self.children, 
self.remapped_children.as_ref(), inner)?;

Review Comment:
   now you evaluate based on the snapshot. snapshot is the remapped filter with 
your filter schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040450375


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
 ]));
 // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
 // and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_1 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_2 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));

Review Comment:
   When you need `reassign_predicate_columns`, basically it re-project columns 
based on the provided schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2039832734


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -105,47 +97,44 @@ impl DynamicFilterPhysicalExpr {
 inner: Arc,
 ) -> Self {
 Self {
-// columns: children,
-// remapped_columns: None, // Initially no remapped children
-remapped_schema: None,
-// remapped_filter: None,
-inner: Arc::new(RwLock::new(inner)),
+inner,
 data_type: Arc::new(RwLock::new(None)),
 nullable: Arc::new(RwLock::new(None)),
 }
 }
 
 // udpate schema
-pub fn with_schema(
-&self,
-schema: SchemaRef,
-) -> Self {
-Self {
-remapped_schema: Some(schema),
-inner: Arc::clone(&self.inner),
-data_type: Arc::clone(&self.data_type),
-nullable: Arc::clone(&self.nullable),
-}
-}
+// pub fn with_schema(&self, schema: SchemaRef) -> Self {
+// Self {
+// remapped_schema: Some(schema),
+// inner: Arc::clone(&self.inner),
+// data_type: Arc::clone(&self.data_type),
+// nullable: Arc::clone(&self.nullable),
+// }
+// }
 
 // get the source filter
 pub fn current(&self) -> Result> {
-let inner = self
-.inner
-.read()
-.map_err(|_| {
-datafusion_common::DataFusionError::Execution(
-"Failed to acquire read lock for inner".to_string(),
-)
-})?
-.clone();
+let inner = Arc::clone(&self.inner);
+
+// let inner = self
+// .inner
+// .read()
+// .map_err(|_| {
+// datafusion_common::DataFusionError::Execution(
+// "Failed to acquire read lock for inner".to_string(),
+// )
+// })?
+// .clone();
 Ok(inner)
 }
 
 // update source filter
-pub fn update(&self, filter: PhysicalExprRef) {
-let mut w = self.inner.write().unwrap();
-*w = filter;
+// create a new one
+pub fn update(&mut self, filter: PhysicalExprRef) {
+self.inner = filter;

Review Comment:
   How will writers have mutable access to this if they have to package it up 
in an `Arc`?



##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
 /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference 
to it.
 #[derive(Debug)]
 pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a 
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the 
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for 
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can 
re-apply the replacements.
-remapped_schema: Option,
 /// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,

Review Comment:
   @jayzhan211 how can this have multiple readers and a writer updating with 
some sort of write lock?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



Re: [PR] dynamic filter refactor [datafusion]

2025-04-11 Thread via GitHub


adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2039623779


##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
 ]));
 // Each ParquetExec calls `with_new_children` on the 
DynamicFilterPhysicalExpr
 // and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_1 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left: 
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) }, 
fail_on_overflow: false }"#);
+let dynamic_filter_2 = 
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));

Review Comment:
   What is expected to call `with_schema`? This seems like a new method on 
`DynamicFilterPhysicalExpr`. We need `reassign_predicate_columns` to work, 
that's what gets called from within `ParquetSource`, etc.



##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
 )
 })?
 .clone();
-let inner =
-Self::remap_children(&self.children, 
self.remapped_children.as_ref(), inner)?;

Review Comment:
   This means that `evaluate` no longer uses a version with remapped children 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]