Re: [PR] dynamic filter refactor [datafusion]
github-actions[bot] closed pull request #15685: dynamic filter refactor URL: https://github.com/apache/datafusion/pull/15685 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
github-actions[bot] commented on PR #15685: URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2982375299 Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
berkaysynnada commented on PR #15685: URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2813385207 Blockers are gone. I think we can focus on this now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
adriangb commented on PR #15685: URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2798502900 Hey @jayzhan211 thank you for putting the work into trying to clarify this. At this point I think it would be best to wait for #15566 or a PR that replaces it to be merged so that we can work against an actual use case / implementation of these dynamic filters. Otherwise I think it's a bit hard to communicate in such abstract terms. Once we're looking at a concrete use case it will be easier to make a PR to replace this implementation. Would it be okay with you to wait until that happens to continue this discussion? Sorry if merging a PR with a bad implementation becomes problematic... luckily it's problematic for us, not end users, since this is all private implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on PR #15685: URL: https://github.com/apache/datafusion/pull/15685#issuecomment-2798405570 https://github.com/apache/datafusion/pull/15568#discussion_r2038773841 # why the change is equivalent to your in the high level idea. > 1. DynamicFilterPhysicalExpr gets initialized at planning time with a known set of children but a placeholder expression (lit(true)) The same. > 2. with_new_children is called making a new DynamicFilterPhysicalExpr but with the children replaced (let's ignore how that happens internally for now) We need to replace children, and we can achieve this and get the result by `snapshot` > 3. update is called on the original reference with an expression that references the original children. This is propagated to all references, including those with new children, because of the Arc>. We can keep `Arc>` and update the inner or even create another new source filter. > 4. evaluate is called on one of the references that previously had with_new_children called on it. Since update was called, which swapped out inner, the children of this new inner need to be remapped to the children that we currently expose externally. We can call `evaluate` on `snapshot` because `snapshot` is already remapped. Your version need to remap for each `evaluate` called, but `snapshot` in my version done it once, and we evaluate on it without remap. # The improvement of this chanage 1. We have correct `with_new_children` because we update the source filter now. 2. `DynamicFilterPhysicalExpr` is basically `filter expression: Arc>`. We have a simple interface with the same capability now. # Concern 1. `reassign_predicate_columns` is replaced by `snapshot` but you think we can't do this kind of change because of API churn. I think this is not an issue because we are not using in many places. 2. Do we need Lock for source filter? I think we can create another new `DynamicFilterPhysicalExpr` at all. But maybe there is some reasons we can't, we can discuss further on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040512866
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
)
})?
.clone();
-let inner =
-Self::remap_children(&self.children,
self.remapped_children.as_ref(), inner)?;
Review Comment:
snapshot is actually an evaluated dynamic filter based on the schema.
Basically, what you have is just the filter expression. You provide the
schema to remap the column indexes. You get yet another filter expression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040512371
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
)
})?
.clone();
-let inner =
-Self::remap_children(&self.children,
self.remapped_children.as_ref(), inner)?;
Review Comment:
I think the difference is that
Your version
1. we have source filter A
2. create dynamic filter a and b by different filter schema
3. you create snapshot a, b from them.
4. evaluate batches by snapshot
5. update source filter A to B
6. dynamic filter a, b remap based on source filter B when you call evaluate
My version
1. we have source filter A
2. create snapshot based on source filter A + filter schema A and B
3. evaluate batches by snapshot
4. update source filter A to B
5. create snapshot based on source filter B + filter schema A and B
6. evaluate batches by snapshot
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040510160
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference
to it.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can
re-apply the replacements.
-remapped_schema: Option,
/// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,
Review Comment:
> The whole point is that you can create a filter at planning time, bind it
to a ParquetSource and a SortExec (for example) and then the SortExec can
dynamically update it at runtime.
instead of sending the filter down, the change I have is sending the filter
schema down. It is used to create another filter (snapshot) in SortExec
dynamically at runtime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040505924
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
]));
// Each ParquetExec calls `with_new_children` on the
DynamicFilterPhysicalExpr
// and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_1 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_2 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Review Comment:
If you really want to keep `reassign_predicate_columns` for whatever reason,
you should pass the `inner` of `DynamicFilterPhysicalExpr` instead, so you are
only modifying the `inner` and not the whole `DynamicFilterPhysicalExpr`. The
difference is that the `with_new_children` is not called on the
`DynamicFilterPhysicalExpr` level.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040504133
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
]));
// Each ParquetExec calls `with_new_children` on the
DynamicFilterPhysicalExpr
// and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_1 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_2 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Review Comment:
> otherwise that's a ton of API churn.
We only use in `DatafusionArrowPredicate`, is there any other places we need
to change?
The main point is that `with_new_children` in the main branch isn't doing
the right thing, it should update the source filter instead, but you only
update the remapped filter schema. I think the filter schema is "parameters"
for remapping column indexes, it doesn't need to be part of the filter
expressions at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040471645
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
]));
// Each ParquetExec calls `with_new_children` on the
DynamicFilterPhysicalExpr
// and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_1 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_2 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Review Comment:
> We need reassign_predicate_columns to work, that's what gets called from
within ParquetSource, etc
We can also change `reassign_predicate_columns` inside Parquet if that bring
us to the better state. From my view, `reassign_predicate_columns` is the root
cause why you ends up `with_new_children` that doesn't actually updating the
"children".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040469037
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference
to it.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can
re-apply the replacements.
-remapped_schema: Option,
/// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,
Review Comment:
But how do you pipe the new filter down into other operators?
The whole point is that you can create a filter at planning time, bind it to
a ParquetSource and a SortExec (for example) and then the SortExec can
dynamically update it at runtime.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040467822
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
)
})?
.clone();
-let inner =
-Self::remap_children(&self.children,
self.remapped_children.as_ref(), inner)?;
Review Comment:
I feel like we're going in circles... this is the thing that was expected to
be used to produce the snapshots... now we're using snapshots to produce it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040466799
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
]));
// Each ParquetExec calls `with_new_children` on the
DynamicFilterPhysicalExpr
// and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_1 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_2 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Review Comment:
Right but the constraint is that we can't modify what
`reassign_predicate_columns` and similar do internally, otherwise that's a ton
of API churn. The existing design works within the confines of the existing
APIs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040452282
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference
to it.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can
re-apply the replacements.
-remapped_schema: Option,
/// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,
Review Comment:
I think we don't need it. Given a source filter, you create snapshot with
the schema. Then you evaluate based on the remapped filter. When you need a new
source filter, instead of updating it, just create a new one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040451293
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
)
})?
.clone();
-let inner =
-Self::remap_children(&self.children,
self.remapped_children.as_ref(), inner)?;
Review Comment:
now you evaluate based on the snapshot. snapshot is the remapped filter with
your filter schema
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
jayzhan211 commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2040450375
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
]));
// Each ParquetExec calls `with_new_children` on the
DynamicFilterPhysicalExpr
// and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_1 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_2 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Review Comment:
When you need `reassign_predicate_columns`, basically it re-project columns
based on the provided schema.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2039832734
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -105,47 +97,44 @@ impl DynamicFilterPhysicalExpr {
inner: Arc,
) -> Self {
Self {
-// columns: children,
-// remapped_columns: None, // Initially no remapped children
-remapped_schema: None,
-// remapped_filter: None,
-inner: Arc::new(RwLock::new(inner)),
+inner,
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
}
}
// udpate schema
-pub fn with_schema(
-&self,
-schema: SchemaRef,
-) -> Self {
-Self {
-remapped_schema: Some(schema),
-inner: Arc::clone(&self.inner),
-data_type: Arc::clone(&self.data_type),
-nullable: Arc::clone(&self.nullable),
-}
-}
+// pub fn with_schema(&self, schema: SchemaRef) -> Self {
+// Self {
+// remapped_schema: Some(schema),
+// inner: Arc::clone(&self.inner),
+// data_type: Arc::clone(&self.data_type),
+// nullable: Arc::clone(&self.nullable),
+// }
+// }
// get the source filter
pub fn current(&self) -> Result> {
-let inner = self
-.inner
-.read()
-.map_err(|_| {
-datafusion_common::DataFusionError::Execution(
-"Failed to acquire read lock for inner".to_string(),
-)
-})?
-.clone();
+let inner = Arc::clone(&self.inner);
+
+// let inner = self
+// .inner
+// .read()
+// .map_err(|_| {
+// datafusion_common::DataFusionError::Execution(
+// "Failed to acquire read lock for inner".to_string(),
+// )
+// })?
+// .clone();
Ok(inner)
}
// update source filter
-pub fn update(&self, filter: PhysicalExprRef) {
-let mut w = self.inner.write().unwrap();
-*w = filter;
+// create a new one
+pub fn update(&mut self, filter: PhysicalExprRef) {
+self.inner = filter;
Review Comment:
How will writers have mutable access to this if they have to package it up
in an `Arc`?
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -36,16 +36,8 @@ use super::Column;
/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference
to it.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
-/// The original children of this PhysicalExpr, if any.
-/// This is necessary because the dynamic filter may be initialized with a
placeholder (e.g. `lit(true)`)
-/// and later remapped to the actual expressions that are being filtered.
-/// But we need to know the children (e.g. columns referenced in the
expression) ahead of time to evaluate the expression correctly.
-// columns: Vec>,
-// /// If any of the children were remapped / modified (e.g. to adjust for
projections) we need to keep track of the new children
-// /// so that when we update `current()` in subsequent iterations we can
re-apply the replacements.
-remapped_schema: Option,
/// The source of dynamic filters.
-inner: Arc>,
+inner: PhysicalExprRef,
Review Comment:
@jayzhan211 how can this have multiple readers and a writer updating with
some sort of write lock?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] dynamic filter refactor [datafusion]
adriangb commented on code in PR #15685:
URL: https://github.com/apache/datafusion/pull/15685#discussion_r2039623779
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -335,22 +313,12 @@ mod test {
]));
// Each ParquetExec calls `with_new_children` on the
DynamicFilterPhysicalExpr
// and remaps the children to the file schema.
-let dynamic_filter_1 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_1,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_1.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
-let dynamic_filter_2 = reassign_predicate_columns(
-Arc::clone(&dynamic_filter) as Arc,
-&filter_schema_2,
-false,
-)
-.unwrap();
-let snap = dynamic_filter_2.snapshot().unwrap().unwrap();
-insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_1 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_1));
+let snap_1 = dynamic_filter_1.snapshot().unwrap().unwrap();
+insta::assert_snapshot!(format!("{snap_1:?}"), @r#"BinaryExpr { left:
Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42) },
fail_on_overflow: false }"#);
+let dynamic_filter_2 =
dynamic_filter.with_schema(Arc::clone(&filter_schema_2));
Review Comment:
What is expected to call `with_schema`? This seems like a new method on
`DynamicFilterPhysicalExpr`. We need `reassign_predicate_columns` to work,
that's what gets called from within `ParquetSource`, etc.
##
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##
@@ -159,35 +139,13 @@ impl DynamicFilterPhysicalExpr {
)
})?
.clone();
-let inner =
-Self::remap_children(&self.children,
self.remapped_children.as_ref(), inner)?;
Review Comment:
This means that `evaluate` no longer uses a version with remapped children
right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
