This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 089a42ac98 Minor: Add `Column::from(Tableref, &FieldRef)`,
`Expr::from(Column)` and `Expr::from(Tableref, &FieldRef)` (#10178)
089a42ac98 is described below
commit 089a42ac98d3a57dca3c50c791e34d88bdb7af7d
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Apr 23 09:15:39 2024 -0400
Minor: Add `Column::from(Tableref, &FieldRef)`, `Expr::from(Column)` and
`Expr::from(Tableref, &FieldRef)` (#10178)
* Minor: Add `Column::from(Tableref, &FieldRef)`
* Add Expr::from()
* fix docs
* Fix doc test
---
benchmarks/src/tpch/convert.rs | 4 +--
datafusion/common/src/column.rs | 11 +++++-
datafusion/core/src/dataframe/mod.rs | 6 ++--
datafusion/core/src/physical_planner.rs | 11 ++----
datafusion/expr/src/expr.rs | 42 +++++++++++++++++++++-
datafusion/expr/src/expr_rewriter/mod.rs | 8 +----
datafusion/expr/src/logical_plan/builder.rs | 2 +-
datafusion/expr/src/utils.rs | 9 ++---
.../optimizer/src/common_subexpr_eliminate.rs | 7 ++--
.../optimizer/src/replace_distinct_aggregate.rs | 2 +-
datafusion/sql/src/expr/mod.rs | 8 ++---
datafusion/sql/src/statement.rs | 3 +-
12 files changed, 68 insertions(+), 45 deletions(-)
diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs
index a841fe5322..30178d17aa 100644
--- a/benchmarks/src/tpch/convert.rs
+++ b/benchmarks/src/tpch/convert.rs
@@ -88,9 +88,7 @@ impl ConvertOpt {
.schema()
.iter()
.take(schema.fields.len() - 1)
- .map(|(qualifier, field)| {
- Expr::Column(Column::from((qualifier, field.as_ref())))
- })
+ .map(Expr::from)
.collect();
csv = csv.select(selection)?;
diff --git a/datafusion/common/src/column.rs b/datafusion/common/src/column.rs
index dec87d9d07..ae31465163 100644
--- a/datafusion/common/src/column.rs
+++ b/datafusion/common/src/column.rs
@@ -17,7 +17,7 @@
//! Column
-use arrow_schema::Field;
+use arrow_schema::{Field, FieldRef};
use crate::error::_schema_err;
use crate::utils::{parse_identifiers_normalized, quote_identifier};
@@ -63,6 +63,8 @@ impl Column {
}
/// Create Column from unqualified name.
+ ///
+ /// Alias for `Column::new_unqualified`
pub fn from_name(name: impl Into<String>) -> Self {
Self {
relation: None,
@@ -346,6 +348,13 @@ impl From<(Option<&TableReference>, &Field)> for Column {
}
}
+/// Create a column, use qualifier and field name
+impl From<(Option<&TableReference>, &FieldRef)> for Column {
+ fn from((relation, field): (Option<&TableReference>, &FieldRef)) -> Self {
+ Self::new(relation.cloned(), field.name())
+ }
+}
+
impl FromStr for Column {
type Err = Infallible;
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index abf09772e5..bd561e8983 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -1332,7 +1332,7 @@ impl DataFrame {
col_exists = true;
new_column.clone()
} else {
- col(Column::from((qualifier, field.as_ref())))
+ col(Column::from((qualifier, field)))
}
})
.collect();
@@ -1402,9 +1402,9 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() ==
field_rename {
- col(Column::from((qualifier,
field.as_ref()))).alias(new_name)
+ col(Column::from((qualifier, field))).alias(new_name)
} else {
- col(Column::from((qualifier, field.as_ref())))
+ col(Column::from((qualifier, field)))
}
})
.collect::<Vec<_>>();
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index e6785b1dec..848f561ffb 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1261,15 +1261,8 @@ impl DefaultPhysicalPlanner {
// Remove temporary projected columns
if left_projected || right_projected {
- let final_join_result = join_schema
- .iter()
- .map(|(qualifier, field)| {
- Expr::Column(datafusion_common::Column::from((
- qualifier,
- field.as_ref(),
- )))
- })
- .collect::<Vec<_>>();
+ let final_join_result =
+
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
let projection =
LogicalPlan::Projection(Projection::try_new(
final_join_result,
Arc::new(new_join),
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index fb75a3cc7a..6f76936806 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -32,7 +32,7 @@ use crate::{
Signature,
};
-use arrow::datatypes::DataType;
+use arrow::datatypes::{DataType, FieldRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
internal_err, plan_err, Column, DFSchema, Result, ScalarValue,
TableReference,
@@ -84,6 +84,29 @@ use sqlparser::ast::NullTreatment;
/// assert_eq!(binary_expr.op, Operator::Eq);
/// }
/// ```
+///
+/// ## Return a list of [`Expr::Column`] from a schema's columns
+/// ```
+/// # use arrow::datatypes::{DataType, Field, Schema};
+/// # use datafusion_common::{DFSchema, Column};
+/// # use datafusion_expr::Expr;
+///
+/// let arrow_schema = Schema::new(vec![
+/// Field::new("c1", DataType::Int32, false),
+/// Field::new("c2", DataType::Float64, false),
+/// ]);
+/// let df_schema = DFSchema::try_from_qualified_schema("t1",
&arrow_schema).unwrap();
+///
+/// // Form a list of expressions for each item in the schema
+/// let exprs: Vec<_> = df_schema.iter()
+/// .map(Expr::from)
+/// .collect();
+///
+/// assert_eq!(exprs, vec![
+/// Expr::from(Column::from_qualified_name("t1.c1")),
+/// Expr::from(Column::from_qualified_name("t1.c2")),
+/// ]);
+/// ```
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub enum Expr {
/// An expression with a specific name.
@@ -190,6 +213,23 @@ impl Default for Expr {
}
}
+/// Create an [`Expr`] from a [`Column`]
+impl From<Column> for Expr {
+ fn from(value: Column) -> Self {
+ Expr::Column(value)
+ }
+}
+
+/// Create an [`Expr`] from an optional qualifier and a [`FieldRef`]. This is
+/// useful for creating [`Expr`] from a [`DFSchema`].
+///
+/// See example on [`Expr`]
+impl<'a> From<(Option<&'a TableReference>, &'a FieldRef)> for Expr {
+ fn from(value: (Option<&'a TableReference>, &'a FieldRef)) -> Self {
+ Expr::from(Column::from(value))
+ }
+}
+
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct Unnest {
pub expr: Box<Expr>,
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs
b/datafusion/expr/src/expr_rewriter/mod.rs
index f5779df812..fd6446eba9 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -218,13 +218,7 @@ pub fn coerce_plan_expr_for_schema(
Ok(LogicalPlan::Projection(projection))
}
_ => {
- let exprs: Vec<Expr> = plan
- .schema()
- .iter()
- .map(|(qualifier, field)| {
- Expr::Column(Column::from((qualifier, field.as_ref())))
- })
- .collect();
+ let exprs: Vec<Expr> =
plan.schema().iter().map(Expr::from).collect();
let new_exprs = coerce_exprs_for_schema(exprs, plan.schema(),
schema)?;
let add_project = new_exprs.iter().any(|expr|
expr.try_into_col().is_err());
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 2810425ae1..fa4b0b9642 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1577,7 +1577,7 @@ pub fn unnest_with_options(
return Ok(input);
}
};
- qualified_columns.push(Column::from((unnest_qualifier,
unnested_field.as_ref())));
+ qualified_columns.push(Column::from((unnest_qualifier,
&unnested_field)));
unnested_fields.insert(index, unnested_field);
}
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 8da93c244c..64fe98c23b 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -356,12 +356,7 @@ fn get_exprs_except_skipped(
columns_to_skip: HashSet<Column>,
) -> Vec<Expr> {
if columns_to_skip.is_empty() {
- schema
- .iter()
- .map(|(qualifier, field)| {
- Expr::Column(Column::from((qualifier, field.as_ref())))
- })
- .collect::<Vec<Expr>>()
+ schema.iter().map(Expr::from).collect::<Vec<Expr>>()
} else {
schema
.columns()
@@ -855,7 +850,7 @@ pub fn expr_as_column_expr(expr: &Expr, plan: &LogicalPlan)
-> Result<Expr> {
match expr {
Expr::Column(col) => {
let (qualifier, field) =
plan.schema().qualified_field_from_column(col)?;
- Ok(Expr::Column(Column::from((qualifier, field))))
+ Ok(Expr::from(Column::from((qualifier, field))))
}
_ => Ok(Expr::Column(Column::from_name(expr.display_name()?))),
}
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index 690b596ed3..b859dda9d5 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -506,7 +506,7 @@ fn build_common_expr_project_plan(
for (qualifier, field) in input.schema().iter() {
if fields_set.insert(qualified_name(qualifier, field.name())) {
- project_exprs.push(Expr::Column(Column::from((qualifier,
field.as_ref()))));
+ project_exprs.push(Expr::from((qualifier, field)));
}
}
@@ -525,10 +525,7 @@ fn build_recover_project_plan(
schema: &DFSchema,
input: LogicalPlan,
) -> Result<LogicalPlan> {
- let col_exprs = schema
- .iter()
- .map(|(qualifier, field)| Expr::Column(Column::from((qualifier,
field.as_ref()))))
- .collect();
+ let col_exprs = schema.iter().map(Expr::from).collect();
Ok(LogicalPlan::Projection(Projection::try_new(
col_exprs,
Arc::new(input),
diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs
b/datafusion/optimizer/src/replace_distinct_aggregate.rs
index f464506057..4f68e2623f 100644
--- a/datafusion/optimizer/src/replace_distinct_aggregate.rs
+++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs
@@ -127,7 +127,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
.skip(on_expr.len())
.zip(schema.iter())
.map(|((new_qualifier, new_field), (old_qualifier,
old_field))| {
- Ok(col(Column::from((new_qualifier,
new_field.as_ref())))
+ Ok(col(Column::from((new_qualifier, new_field)))
.alias_qualified(old_qualifier.cloned(),
old_field.name()))
})
.collect::<Result<Vec<Expr>>>()?;
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index 4bc0719fba..3f2134bf7e 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -21,8 +21,8 @@ use sqlparser::ast::{ArrayAgg, Expr as SQLExpr, JsonOperator,
TrimWhereField, Va
use sqlparser::parser::ParserError::ParserError;
use datafusion_common::{
- internal_datafusion_err, internal_err, not_impl_err, plan_err, Column,
DFSchema,
- Result, ScalarValue,
+ internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
Result,
+ ScalarValue,
};
use datafusion_expr::expr::AggregateFunctionDefinition;
use datafusion_expr::expr::InList;
@@ -142,9 +142,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
_ => false,
}) {
- Some((qualifier, df_field)) => {
- Expr::Column(Column::from((qualifier,
df_field.as_ref())))
- }
+ Some((qualifier, df_field)) => Expr::from((qualifier,
df_field)),
None => Expr::Column(col),
}
}
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 759a5e8ce9..c81217aa70 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -1307,8 +1307,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
))
} else {
datafusion_expr::Expr::Column(Column::from((
- qualifier,
- field.as_ref(),
+ qualifier, field,
)))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]