alamb commented on code in PR #16589:
URL: https://github.com/apache/datafusion/pull/16589#discussion_r2261411619
##########
datafusion/physical-expr-adapter/README.md:
##########
@@ -0,0 +1,14 @@
+# DataFusion Physical Expression Adapter
+
+This crate provides physical expression schema adaptation utilities for
DataFusion that allow adapting a `PhysicalExpr` to different schema types.
+This handles cases such as `lit(ScalarValue::Int32(123)) = int64_column` by
rewriting it to `lit(ScalarValue::Int32(123)) = cast(int64_column, 'Int32')`
+(note: this does not attempt to then simplify such expressions, that is done
by shared simplifiers).
+
+## Overview
Review Comment:
I recommend putting this information on the doc comments of
`PhysicalExprSchemaRewriter` itself as it will be more likely someone will find
it (either in docs.rs or their IDE) than this readme
The high level overview is cool but I recommend keeping the README
relatively brief
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -220,13 +223,111 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
&self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+ if let Some(transformed) =
self.try_rewrite_struct_field_access(&expr)? {
+ return Ok(Transformed::yes(transformed));
+ }
+
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
return self.rewrite_column(Arc::clone(&expr), column);
}
Ok(Transformed::no(expr))
}
+ fn try_rewrite_struct_field_access(
+ &self,
+ expr: &Arc<dyn PhysicalExpr>,
+ ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+ let get_field_expr = match
expr.as_any().downcast_ref::<ScalarFunctionExpr>() {
+ Some(expr) => expr,
+ None => return Ok(None),
+ };
+
+ if get_field_expr.name() != "get_field" {
Review Comment:
Why do we need to check the function name. Isn't the check for
downcast_ref().is_none sufficient?
Also, it seems to me that this rewrite is specific to `GetFieldFunc` so I
think it might make sense, long term, to put the function alongside the
GetFieldFunc somehow 🤔
Maybe we could add a trait to PhysicalExpr like `rewrite_for_schema` or
something 🤔
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -97,13 +101,111 @@ impl<'a> PhysicalExprSchemaRewriter<'a> {
&self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+ if let Some(transformed) =
self.try_rewrite_struct_field_access(&expr)? {
+ return Ok(Transformed::yes(transformed));
+ }
+
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
return self.rewrite_column(Arc::clone(&expr), column);
}
Ok(Transformed::no(expr))
}
+ fn try_rewrite_struct_field_access(
Review Comment:
Maybe we can file a ticket to make sure this is properly tracked
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -220,13 +223,111 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
&self,
expr: Arc<dyn PhysicalExpr>,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
+ if let Some(transformed) =
self.try_rewrite_struct_field_access(&expr)? {
+ return Ok(Transformed::yes(transformed));
+ }
+
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
return self.rewrite_column(Arc::clone(&expr), column);
}
Ok(Transformed::no(expr))
}
+ fn try_rewrite_struct_field_access(
+ &self,
+ expr: &Arc<dyn PhysicalExpr>,
+ ) -> Result<Option<Arc<dyn PhysicalExpr>>> {
+ let get_field_expr = match
expr.as_any().downcast_ref::<ScalarFunctionExpr>() {
+ Some(expr) => expr,
+ None => return Ok(None),
+ };
+
+ if get_field_expr.name() != "get_field" {
+ return Ok(None);
+ }
+
+ if get_field_expr
+ .fun()
+ .inner()
+ .as_any()
+ .downcast_ref::<GetFieldFunc>()
+ .is_none()
+ {
+ return Ok(None);
+ }
+
+ let source_expr = match get_field_expr.args().first() {
+ Some(expr) => expr,
+ None => return Ok(None),
+ };
+
+ let field_name_expr = match get_field_expr.args().get(1) {
+ Some(expr) => expr,
+ None => return Ok(None),
+ };
+
+ let lit = match field_name_expr
+ .as_any()
+ .downcast_ref::<expressions::Literal>()
+ {
+ Some(lit) => lit,
+ None => return Ok(None),
+ };
+
+ let field_name = match lit.value() {
Review Comment:
try_as_str might be useful here:
https://docs.rs/datafusion/latest/datafusion/common/enum.ScalarValue.html#method.try_as_str
##########
datafusion/physical-expr-adapter/src/schema_rewriter.rs:
##########
@@ -426,6 +527,77 @@ mod tests {
);
}
+ #[test]
+ fn test_rewrite_struct_column_incompatible() {
+ let physical_schema = Schema::new(vec![Field::new(
+ "data",
+ DataType::Struct(vec![Field::new("field1", DataType::Binary,
true)].into()),
+ true,
+ )]);
+
+ let logical_schema = Schema::new(vec![Field::new(
+ "data",
+ DataType::Struct(vec![Field::new("field1", DataType::Int32,
true)].into()),
+ true,
+ )]);
+
+ let factory = DefaultPhysicalExprAdapterFactory;
+ let adapter = factory.create(Arc::new(logical_schema),
Arc::new(physical_schema));
+ let column_expr = Arc::new(Column::new("data", 0));
+
+ let result = adapter.rewrite(column_expr);
+ assert!(result.is_err());
+ let error_msg = result.unwrap_err().to_string();
+ assert_contains!(error_msg, "Cannot cast column 'data'");
Review Comment:
You can make this simpler by using unwrap_err I think
```suggestion
let error_msg =
adapter.rewrite(column_expr).unwrap_err().to_string();
assert_contains!(error_msg, "Cannot cast column 'data'");
```
Similarly for other ones below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]