houqp commented on a change in pull request #55: URL: https://github.com/apache/arrow-datafusion/pull/55#discussion_r650435911
########## File path: ballista/rust/core/src/serde/logical_plan/from_proto.rs ########## @@ -1361,43 +1224,3 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame { }) } } - -impl From<protobuf::AggregateFunction> for AggregateFunction { - fn from(aggr_function: protobuf::AggregateFunction) -> Self { - match aggr_function { - protobuf::AggregateFunction::Min => AggregateFunction::Min, - protobuf::AggregateFunction::Max => AggregateFunction::Max, - protobuf::AggregateFunction::Sum => AggregateFunction::Sum, - protobuf::AggregateFunction::Avg => AggregateFunction::Avg, - protobuf::AggregateFunction::Count => AggregateFunction::Count, - } - } -} - -impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction { - fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self { - match built_in_function { - protobuf::BuiltInWindowFunction::RowNumber => { - BuiltInWindowFunction::RowNumber - } - protobuf::BuiltInWindowFunction::Rank => BuiltInWindowFunction::Rank, - protobuf::BuiltInWindowFunction::PercentRank => { - BuiltInWindowFunction::PercentRank - } - protobuf::BuiltInWindowFunction::DenseRank => { - BuiltInWindowFunction::DenseRank - } - protobuf::BuiltInWindowFunction::Lag => BuiltInWindowFunction::Lag, - protobuf::BuiltInWindowFunction::Lead => BuiltInWindowFunction::Lead, - protobuf::BuiltInWindowFunction::FirstValue => { - BuiltInWindowFunction::FirstValue - } - protobuf::BuiltInWindowFunction::CumeDist => BuiltInWindowFunction::CumeDist, - protobuf::BuiltInWindowFunction::Ntile => BuiltInWindowFunction::Ntile, - protobuf::BuiltInWindowFunction::NthValue => BuiltInWindowFunction::NthValue, - protobuf::BuiltInWindowFunction::LastValue => { - BuiltInWindowFunction::LastValue - } - } - } -} Review comment: all these shared code are moved into `ballista/rust/core/src/serde/mod.rs`. ########## File path: datafusion/src/logical_plan/builder.rs ########## @@ -210,69 +238,103 @@ impl LogicalPlanBuilder { } /// Apply a sort - pub fn sort(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> { + pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> { + let schemas = self.plan.all_schemas(); Ok(Self::from(&LogicalPlan::Sort { - expr: expr.into_iter().collect(), + expr: normalize_cols(exprs, &schemas)?, input: Arc::new(self.plan.clone()), })) } /// Apply a union pub fn union(&self, plan: LogicalPlan) -> Result<Self> { - let schema = self.plan.schema(); Review comment: refactored implementation into `union_with_alias` function so the same logic can be reused in sql planner. ########## File path: datafusion/src/physical_plan/planner.rs ########## @@ -56,6 +56,121 @@ use expressions::col; use log::debug; use std::sync::Arc; +fn create_function_physical_name( + fun: &str, + distinct: bool, + args: &[Expr], + input_schema: &DFSchema, +) -> Result<String> { + let names: Vec<String> = args + .iter() + .map(|e| physical_name(e, input_schema)) + .collect::<Result<_>>()?; + + let distinct_str = match distinct { + true => "DISTINCT ", + false => "", + }; + Ok(format!("{}({}{})", fun, distinct_str, names.join(","))) +} + +fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> { Review comment: This function is responsible for generating physical field name for a given expression based on https://github.com/apache/arrow-datafusion/blob/master/docs/specification/output-field-name-semantic.md. I kept `Expr.name` as is for better readability in query explain and debug output. ########## File path: datafusion/src/physical_plan/planner.rs ########## @@ -501,17 +684,20 @@ impl DefaultPhysicalPlanner { pub fn create_physical_expr( &self, e: &Expr, + input_dfschema: &DFSchema, Review comment: `create_physical_expr` now requires logical plan schema for column index lookup ########## File path: datafusion/src/sql/planner.rs ########## @@ -424,17 +408,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { ctes: &mut HashMap<String, LogicalPlan>, ) -> Result<LogicalPlan> { match relation { - TableFactor::Table { name, .. } => { + TableFactor::Table { name, alias, .. } => { let table_name = name.to_string(); let cte = ctes.get(&table_name); match ( cte, self.schema_provider.get_table_provider(name.try_into()?), ) { (Some(cte_plan), _) => Ok(cte_plan.clone()), - (_, Some(provider)) => { - LogicalPlanBuilder::scan(&table_name, provider, None)?.build() - } + (_, Some(provider)) => LogicalPlanBuilder::scan( + // take alias into account to support `JOIN table1 as table2` + alias + .as_ref() + .map(|a| a.name.value.as_str()) + .unwrap_or(&table_name), + provider, + None, + )? + .build(), Review comment: this change has been split out into https://github.com/apache/arrow-datafusion/pull/547 for easier review. ########## File path: datafusion/src/logical_plan/expr.rs ########## @@ -1240,7 +1394,7 @@ impl fmt::Debug for Expr { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias), - Expr::Column(name) => write!(f, "#{}", name), + Expr::Column(c) => write!(f, "{}", c), Review comment: `#` prefix is now provided by Column's `fmt` method. ########## File path: datafusion/src/physical_plan/expressions/column.rs ########## @@ -61,26 +68,21 @@ impl PhysicalExpr for Column { /// Get the data type of this expression, given the schema of the input fn data_type(&self, input_schema: &Schema) -> Result<DataType> { - Ok(input_schema - .field_with_name(&self.name)? - .data_type() - .clone()) + Ok(input_schema.field(self.index).data_type().clone()) } /// Decide whehter this expression is nullable, given the schema of the input fn nullable(&self, input_schema: &Schema) -> Result<bool> { - Ok(input_schema.field_with_name(&self.name)?.is_nullable()) + Ok(input_schema.field(self.index).is_nullable()) } /// Evaluate the expression fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> { - Ok(ColumnarValue::Array( - batch.column(batch.schema().index_of(&self.name)?).clone(), - )) + Ok(ColumnarValue::Array(batch.column(self.index).clone())) Review comment: Physical column expression evaluation now indexes record batch column directly instead of doing string comparisons on all field names. ########## File path: datafusion/src/physical_plan/functions.rs ########## @@ -512,39 +512,35 @@ macro_rules! invoke_if_unicode_expressions_feature_flag { }; } -/// Create a physical (function) expression. -/// This function errors when `args`' can't be coerced to a valid argument type of the function. -pub fn create_physical_expr( +/// Create a physical scalar function. Review comment: this change has been split out into https://github.com/apache/arrow-datafusion/pull/548 for easier review. ########## File path: ballista/rust/core/src/serde/logical_plan/from_proto.rs ########## @@ -1361,43 +1224,3 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame { }) } } - -impl From<protobuf::AggregateFunction> for AggregateFunction { - fn from(aggr_function: protobuf::AggregateFunction) -> Self { - match aggr_function { - protobuf::AggregateFunction::Min => AggregateFunction::Min, - protobuf::AggregateFunction::Max => AggregateFunction::Max, - protobuf::AggregateFunction::Sum => AggregateFunction::Sum, - protobuf::AggregateFunction::Avg => AggregateFunction::Avg, - protobuf::AggregateFunction::Count => AggregateFunction::Count, - } - } -} - -impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction { - fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self { - match built_in_function { - protobuf::BuiltInWindowFunction::RowNumber => { - BuiltInWindowFunction::RowNumber - } - protobuf::BuiltInWindowFunction::Rank => BuiltInWindowFunction::Rank, - protobuf::BuiltInWindowFunction::PercentRank => { - BuiltInWindowFunction::PercentRank - } - protobuf::BuiltInWindowFunction::DenseRank => { - BuiltInWindowFunction::DenseRank - } - protobuf::BuiltInWindowFunction::Lag => BuiltInWindowFunction::Lag, - protobuf::BuiltInWindowFunction::Lead => BuiltInWindowFunction::Lead, - protobuf::BuiltInWindowFunction::FirstValue => { - BuiltInWindowFunction::FirstValue - } - protobuf::BuiltInWindowFunction::CumeDist => BuiltInWindowFunction::CumeDist, - protobuf::BuiltInWindowFunction::Ntile => BuiltInWindowFunction::Ntile, - protobuf::BuiltInWindowFunction::NthValue => BuiltInWindowFunction::NthValue, - protobuf::BuiltInWindowFunction::LastValue => { - BuiltInWindowFunction::LastValue - } - } - } -} Review comment: all these shared code are moved into `ballista/rust/core/src/serde/mod.rs` so it can be shared with physical plan ser/de. ########## File path: ballista/rust/core/proto/ballista.proto ########## @@ -408,6 +421,119 @@ message PhysicalPlanNode { } } +// physical expressions Review comment: Physical expressions are now serialized and deserialized without involving logical expressions. It is needed because logical columns and physical columns are represented differently, one with relation qualifier and the other one with column index. This should also speed up the deserialization for physical plane because we won't need to run physical expressions through physical planner anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org