askalt commented on code in PR #20009:
URL: https://github.com/apache/datafusion/pull/20009#discussion_r2735210582
##########
datafusion/physical-plan/src/projection.rs:
##########
@@ -485,6 +487,44 @@ impl ExecutionPlan for ProjectionExec {
.ok()
})
}
+
+ fn physical_expressions(
+ &self,
+ ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + '_>> {
+ Some(Box::new(self.projector.projection().expr_iter()))
+ }
+
+ fn with_physical_expressions(
+ &self,
+ params: ReplacePhysicalExpr,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let expected_count = self.expr().len();
+ let exprs_count = params.exprs.len();
+ assert_eq_or_internal_err!(
+ expected_count,
+ exprs_count,
+ "Inconsistent number of physical expressions for {}",
+ self.name()
+ );
+
+ let projection_exprs = self
+ .expr()
+ .iter()
+ .zip(params.exprs)
+ .map(|(p, expr)| ProjectionExpr::new(expr, p.alias.clone()))
+ .collect::<Vec<_>>();
+
+ let projection = ProjectionExprs::from(projection_exprs);
+ let input_schema = self.input.schema();
+ let projector = projection.into_projector(&input_schema)?;
+
+ Ok(Some(Arc::new(Self {
+ projector,
+ input: Arc::clone(&self.input),
+ metrics: ExecutionPlanMetricsSet::new(),
+ cache: self.cache.clone(),
+ })))
Review Comment:
Let's use a bit shorter form that highlights what were really changed (we
could apply it for most plans):
```rust
Ok(Some(Arc::new(Self {
projector,
metrics: ExecutionPlanMetricsSet::new(),
..self.clone()
})))
```
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1429,6 +1431,47 @@ impl ExecutionPlan for SortExec {
Ok(FilterDescription::new().with_child(child))
}
+
+ fn physical_expressions<'a>(
+ &'a self,
+ ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> {
+ Some(Box::new(
+ self.expr.iter().map(|sort| Arc::clone(&sort.expr)),
+ ))
+ }
+
+ fn with_physical_expressions(
+ &self,
+ params: ReplacePhysicalExpr,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let expected_count = self.expr.len();
+ let exprs_count = params.exprs.len();
+ assert_eq_or_internal_err!(
+ expected_count,
+ exprs_count,
+ "Inconsistent number of physical expressions for {}",
+ self.name()
+ );
+
+ let lex_ordering = LexOrdering::new(
Review Comment:
Can we add a method for `LexOrdering` to avoid code duplication in different
sort nodes?
##########
datafusion/physical-plan/src/joins/symmetric_hash_join.rs:
##########
@@ -647,6 +650,97 @@ impl ExecutionPlan for SymmetricHashJoinExec {
)
.map(|e| Some(Arc::new(e) as _))
}
+
+ fn physical_expressions<'a>(
+ &'a self,
+ ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> {
+ let left_sort_iter = self
Review Comment:
It seems we also need to return `self.on` (to ensure that query like `select
* from t1 join t2 on t1.a + $1 = t2.a` works).
##########
datafusion/datasource/src/source.rs:
##########
@@ -215,6 +215,36 @@ pub trait DataSource: Send + Sync + Debug {
fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn
DataSource>> {
None
}
+
+ /// Returns an iterator over a subset of [`PhysicalExpr`]s that are used
by this [`DataSource`].
+ fn physical_expressions<'a>(
+ &'a self,
+ ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> {
+ None
+ }
+
+ /// Returns a new [`DataSource`] with its physical expressions replaced by
the provided
+ /// `exprs`.
+ ///
+ /// # Constraints
Review Comment:
Let's also add notes about properties (for example,
`DataSource::EquivalenceProperties`) and state here.
##########
datafusion/physical-plan/src/joins/hash_join/exec.rs:
##########
@@ -1384,6 +1386,55 @@ impl ExecutionPlan for HashJoinExec {
}
Ok(result)
}
+
+ fn physical_expressions<'a>(
+ &'a self,
+ ) -> Option<Box<dyn Iterator<Item = Arc<dyn PhysicalExpr>> + 'a>> {
+ self.filter
+ .as_ref()
+ .map(|f| Box::new(std::iter::once(Arc::clone(&f.expression))) as
Box<_>)
+ }
+
+ fn with_physical_expressions(
+ &self,
+ mut params: ReplacePhysicalExpr,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let expected_count = self.filter.iter().len();
+ let exprs_count = params.exprs.len();
+ assert_eq_or_internal_err!(
+ expected_count,
+ exprs_count,
+ "Inconsistent number of physical expressions for {}",
+ self.name()
+ );
+
+ let filter = self
+ .filter
+ .as_ref()
+ .zip(params.exprs.pop())
+ .map(|(f, expr)| {
+ JoinFilter::new(expr, f.column_indices.clone(),
Arc::clone(&f.schema))
+ });
+
+ Ok(Some(Arc::new(Self {
+ left: Arc::clone(&self.left),
+ right: Arc::clone(&self.right),
+ on: self.on.clone(),
+ filter,
+ join_type: self.join_type,
+ join_schema: Arc::clone(&self.join_schema),
+ left_fut: Arc::clone(&self.left_fut),
Review Comment:
We have two different semantics for runtime state: on the one hand metrics
are recreated, when `left_fut` is cloned. Let's explicitly re-set a runtime
state in `with_physical_expressions`: recreate `left_fut` and add a note to the
trait method comment, that it is ok to not preserve the state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]