houqp commented on a change in pull request #55:
URL: https://github.com/apache/arrow-datafusion/pull/55#discussion_r650435911



##########
File path: ballista/rust/core/src/serde/logical_plan/from_proto.rs
##########
@@ -1361,43 +1224,3 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame {
         })
     }
 }
-
-impl From<protobuf::AggregateFunction> for AggregateFunction {
-    fn from(aggr_function: protobuf::AggregateFunction) -> Self {
-        match aggr_function {
-            protobuf::AggregateFunction::Min => AggregateFunction::Min,
-            protobuf::AggregateFunction::Max => AggregateFunction::Max,
-            protobuf::AggregateFunction::Sum => AggregateFunction::Sum,
-            protobuf::AggregateFunction::Avg => AggregateFunction::Avg,
-            protobuf::AggregateFunction::Count => AggregateFunction::Count,
-        }
-    }
-}
-
-impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction {
-    fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self {
-        match built_in_function {
-            protobuf::BuiltInWindowFunction::RowNumber => {
-                BuiltInWindowFunction::RowNumber
-            }
-            protobuf::BuiltInWindowFunction::Rank => 
BuiltInWindowFunction::Rank,
-            protobuf::BuiltInWindowFunction::PercentRank => {
-                BuiltInWindowFunction::PercentRank
-            }
-            protobuf::BuiltInWindowFunction::DenseRank => {
-                BuiltInWindowFunction::DenseRank
-            }
-            protobuf::BuiltInWindowFunction::Lag => BuiltInWindowFunction::Lag,
-            protobuf::BuiltInWindowFunction::Lead => 
BuiltInWindowFunction::Lead,
-            protobuf::BuiltInWindowFunction::FirstValue => {
-                BuiltInWindowFunction::FirstValue
-            }
-            protobuf::BuiltInWindowFunction::CumeDist => 
BuiltInWindowFunction::CumeDist,
-            protobuf::BuiltInWindowFunction::Ntile => 
BuiltInWindowFunction::Ntile,
-            protobuf::BuiltInWindowFunction::NthValue => 
BuiltInWindowFunction::NthValue,
-            protobuf::BuiltInWindowFunction::LastValue => {
-                BuiltInWindowFunction::LastValue
-            }
-        }
-    }
-}

Review comment:
       all these shared code are moved into 
`ballista/rust/core/src/serde/mod.rs`.

##########
File path: datafusion/src/logical_plan/builder.rs
##########
@@ -210,69 +238,103 @@ impl LogicalPlanBuilder {
     }
 
     /// Apply a sort
-    pub fn sort(&self, expr: impl IntoIterator<Item = Expr>) -> Result<Self> {
+    pub fn sort(&self, exprs: impl IntoIterator<Item = Expr>) -> Result<Self> {
+        let schemas = self.plan.all_schemas();
         Ok(Self::from(&LogicalPlan::Sort {
-            expr: expr.into_iter().collect(),
+            expr: normalize_cols(exprs, &schemas)?,
             input: Arc::new(self.plan.clone()),
         }))
     }
 
     /// Apply a union
     pub fn union(&self, plan: LogicalPlan) -> Result<Self> {
-        let schema = self.plan.schema();

Review comment:
       refactored implementation into `union_with_alias` function so the same 
logic can be reused in sql planner.

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -56,6 +56,121 @@ use expressions::col;
 use log::debug;
 use std::sync::Arc;
 
+fn create_function_physical_name(
+    fun: &str,
+    distinct: bool,
+    args: &[Expr],
+    input_schema: &DFSchema,
+) -> Result<String> {
+    let names: Vec<String> = args
+        .iter()
+        .map(|e| physical_name(e, input_schema))
+        .collect::<Result<_>>()?;
+
+    let distinct_str = match distinct {
+        true => "DISTINCT ",
+        false => "",
+    };
+    Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
+}
+
+fn physical_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {

Review comment:
       This function is responsible for generating physical field name for a 
given expression based on 
https://github.com/apache/arrow-datafusion/blob/master/docs/specification/output-field-name-semantic.md.
   
   I kept `Expr.name` as is for better readability in query explain and debug 
output.

##########
File path: datafusion/src/physical_plan/planner.rs
##########
@@ -501,17 +684,20 @@ impl DefaultPhysicalPlanner {
     pub fn create_physical_expr(
         &self,
         e: &Expr,
+        input_dfschema: &DFSchema,

Review comment:
       `create_physical_expr` now requires logical plan schema for column index 
lookup

##########
File path: datafusion/src/sql/planner.rs
##########
@@ -424,17 +408,24 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         ctes: &mut HashMap<String, LogicalPlan>,
     ) -> Result<LogicalPlan> {
         match relation {
-            TableFactor::Table { name, .. } => {
+            TableFactor::Table { name, alias, .. } => {
                 let table_name = name.to_string();
                 let cte = ctes.get(&table_name);
                 match (
                     cte,
                     self.schema_provider.get_table_provider(name.try_into()?),
                 ) {
                     (Some(cte_plan), _) => Ok(cte_plan.clone()),
-                    (_, Some(provider)) => {
-                        LogicalPlanBuilder::scan(&table_name, provider, 
None)?.build()
-                    }
+                    (_, Some(provider)) => LogicalPlanBuilder::scan(
+                        // take alias into account to support `JOIN table1 as 
table2`
+                        alias
+                            .as_ref()
+                            .map(|a| a.name.value.as_str())
+                            .unwrap_or(&table_name),
+                        provider,
+                        None,
+                    )?
+                    .build(),

Review comment:
       this change has been split out into 
https://github.com/apache/arrow-datafusion/pull/547 for easier review.

##########
File path: datafusion/src/logical_plan/expr.rs
##########
@@ -1240,7 +1394,7 @@ impl fmt::Debug for Expr {
     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
         match self {
             Expr::Alias(expr, alias) => write!(f, "{:?} AS {}", expr, alias),
-            Expr::Column(name) => write!(f, "#{}", name),
+            Expr::Column(c) => write!(f, "{}", c),

Review comment:
       `#` prefix is now provided by Column's `fmt` method.

##########
File path: datafusion/src/physical_plan/expressions/column.rs
##########
@@ -61,26 +68,21 @@ impl PhysicalExpr for Column {
 
     /// Get the data type of this expression, given the schema of the input
     fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
-        Ok(input_schema
-            .field_with_name(&self.name)?
-            .data_type()
-            .clone())
+        Ok(input_schema.field(self.index).data_type().clone())
     }
 
     /// Decide whehter this expression is nullable, given the schema of the 
input
     fn nullable(&self, input_schema: &Schema) -> Result<bool> {
-        Ok(input_schema.field_with_name(&self.name)?.is_nullable())
+        Ok(input_schema.field(self.index).is_nullable())
     }
 
     /// Evaluate the expression
     fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
-        Ok(ColumnarValue::Array(
-            batch.column(batch.schema().index_of(&self.name)?).clone(),
-        ))
+        Ok(ColumnarValue::Array(batch.column(self.index).clone()))

Review comment:
       Physical column expression evaluation now indexes record batch column 
directly instead of doing string comparisons on all field names.

##########
File path: datafusion/src/physical_plan/functions.rs
##########
@@ -512,39 +512,35 @@ macro_rules! invoke_if_unicode_expressions_feature_flag {
     };
 }
 
-/// Create a physical (function) expression.
-/// This function errors when `args`' can't be coerced to a valid argument 
type of the function.
-pub fn create_physical_expr(
+/// Create a physical scalar function.

Review comment:
       this change has been split out into 
https://github.com/apache/arrow-datafusion/pull/548 for easier review.

##########
File path: ballista/rust/core/src/serde/logical_plan/from_proto.rs
##########
@@ -1361,43 +1224,3 @@ impl TryFrom<protobuf::WindowFrame> for WindowFrame {
         })
     }
 }
-
-impl From<protobuf::AggregateFunction> for AggregateFunction {
-    fn from(aggr_function: protobuf::AggregateFunction) -> Self {
-        match aggr_function {
-            protobuf::AggregateFunction::Min => AggregateFunction::Min,
-            protobuf::AggregateFunction::Max => AggregateFunction::Max,
-            protobuf::AggregateFunction::Sum => AggregateFunction::Sum,
-            protobuf::AggregateFunction::Avg => AggregateFunction::Avg,
-            protobuf::AggregateFunction::Count => AggregateFunction::Count,
-        }
-    }
-}
-
-impl From<protobuf::BuiltInWindowFunction> for BuiltInWindowFunction {
-    fn from(built_in_function: protobuf::BuiltInWindowFunction) -> Self {
-        match built_in_function {
-            protobuf::BuiltInWindowFunction::RowNumber => {
-                BuiltInWindowFunction::RowNumber
-            }
-            protobuf::BuiltInWindowFunction::Rank => 
BuiltInWindowFunction::Rank,
-            protobuf::BuiltInWindowFunction::PercentRank => {
-                BuiltInWindowFunction::PercentRank
-            }
-            protobuf::BuiltInWindowFunction::DenseRank => {
-                BuiltInWindowFunction::DenseRank
-            }
-            protobuf::BuiltInWindowFunction::Lag => BuiltInWindowFunction::Lag,
-            protobuf::BuiltInWindowFunction::Lead => 
BuiltInWindowFunction::Lead,
-            protobuf::BuiltInWindowFunction::FirstValue => {
-                BuiltInWindowFunction::FirstValue
-            }
-            protobuf::BuiltInWindowFunction::CumeDist => 
BuiltInWindowFunction::CumeDist,
-            protobuf::BuiltInWindowFunction::Ntile => 
BuiltInWindowFunction::Ntile,
-            protobuf::BuiltInWindowFunction::NthValue => 
BuiltInWindowFunction::NthValue,
-            protobuf::BuiltInWindowFunction::LastValue => {
-                BuiltInWindowFunction::LastValue
-            }
-        }
-    }
-}

Review comment:
       all these shared code are moved into 
`ballista/rust/core/src/serde/mod.rs` so it can be shared with physical plan 
ser/de.

##########
File path: ballista/rust/core/proto/ballista.proto
##########
@@ -408,6 +421,119 @@ message PhysicalPlanNode {
   }
 }
 
+// physical expressions

Review comment:
       Physical expressions are now serialized and deserialized without 
involving logical expressions. It is needed because logical columns and 
physical columns are represented differently, one with relation qualifier and 
the other one with column index. This should also speed up the deserialization 
for physical plane because we won't need to run physical expressions through 
physical planner anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to