This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new d0a51f3 ARROW-4897: [Rust] [DataFusion] Improve rustdocs d0a51f3 is described below commit d0a51f3549f4b186c453f0c2423069d1c25d9dc3 Author: Andy Grove <andygrov...@gmail.com> AuthorDate: Sat Mar 16 16:17:06 2019 -0600 ARROW-4897: [Rust] [DataFusion] Improve rustdocs Also removes dead code and changes the visibility on some structs. Author: Andy Grove <andygrov...@gmail.com> Closes #3922 from andygrove/ARROW-4897 and squashes the following commits: 5fbf510 <Andy Grove> Update LIMIT rustrdoc for consistency d9578d0 <Andy Grove> update docs based on feedback 2b90354 <Andy Grove> more rustdocs 21865fc <Andy Grove> more rustdocs and remove some dead code 871ccd8 <Andy Grove> Improve rustdocs --- rust/datafusion/src/datasource/mod.rs | 2 + rust/datafusion/src/execution/aggregate.rs | 5 +- rust/datafusion/src/execution/context.rs | 1 + rust/datafusion/src/execution/error.rs | 1 + rust/datafusion/src/execution/expression.rs | 9 ++- rust/datafusion/src/execution/filter.rs | 8 +- rust/datafusion/src/execution/limit.rs | 9 ++- rust/datafusion/src/execution/mod.rs | 3 +- rust/datafusion/src/execution/physicalplan.rs | 37 --------- rust/datafusion/src/execution/projection.rs | 11 ++- rust/datafusion/src/execution/relation.rs | 11 ++- rust/datafusion/src/logicalplan.rs | 8 +- rust/datafusion/src/optimizer/mod.rs | 2 + rust/datafusion/src/sql/planner.rs | 108 ++------------------------ 14 files changed, 61 insertions(+), 154 deletions(-) diff --git a/rust/datafusion/src/datasource/mod.rs b/rust/datafusion/src/datasource/mod.rs index 5688fb5..a927756 100644 --- a/rust/datafusion/src/datasource/mod.rs +++ b/rust/datafusion/src/datasource/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! DataFusion data sources + pub mod csv; pub mod datasource; pub mod memory; diff --git a/rust/datafusion/src/execution/aggregate.rs b/rust/datafusion/src/execution/aggregate.rs index fb66dbd..d982e39 100644 --- a/rust/datafusion/src/execution/aggregate.rs +++ b/rust/datafusion/src/execution/aggregate.rs @@ -38,7 +38,7 @@ use fnv::FnvHashMap; /// An aggregate relation is made up of zero or more grouping expressions and one /// or more aggregate expressions -pub struct AggregateRelation { +pub(super) struct AggregateRelation { schema: Arc<Schema>, input: Rc<RefCell<Relation>>, group_expr: Vec<RuntimeExpr>, @@ -87,6 +87,7 @@ trait AggregateFunction { fn data_type(&self) -> &DataType; } +/// Implemntation of MIN aggregate function #[derive(Debug)] struct MinFunction { data_type: DataType, @@ -161,6 +162,7 @@ impl AggregateFunction for MinFunction { } } +/// Implemntation of MAX aggregate function #[derive(Debug)] struct MaxFunction { data_type: DataType, @@ -235,6 +237,7 @@ impl AggregateFunction for MaxFunction { } } +/// Implemntation of SUM aggregate function #[derive(Debug)] struct SumFunction { data_type: DataType, diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index 4b586a6..5f40786 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -40,6 +40,7 @@ use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::sql::parser::{DFASTNode, DFParser}; use crate::sql::planner::{SchemaProvider, SqlToRel}; +/// Execution context for registering data sources and executing queries pub struct ExecutionContext { datasources: Rc<RefCell<HashMap<String, Rc<Table>>>>, } diff --git a/rust/datafusion/src/execution/error.rs b/rust/datafusion/src/execution/error.rs index 92ce6d9..b734e6b 100644 --- a/rust/datafusion/src/execution/error.rs +++ b/rust/datafusion/src/execution/error.rs @@ -27,6 +27,7 @@ use sqlparser::sqlparser::ParserError; pub type Result<T> = result::Result<T, ExecutionError>; +/// DataFusion execution error #[derive(Debug)] pub enum ExecutionError { IoError(Error), diff --git a/rust/datafusion/src/execution/expression.rs b/rust/datafusion/src/execution/expression.rs index 361a294..5b01af5 100644 --- a/rust/datafusion/src/execution/expression.rs +++ b/rust/datafusion/src/execution/expression.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! Runtime expression support + use std::rc::Rc; use std::sync::Arc; @@ -32,6 +34,7 @@ pub type CompiledExpr = Rc<Fn(&RecordBatch) -> Result<ArrayRef>>; pub type CompiledCastFunction = Rc<Fn(&ArrayRef) -> Result<ArrayRef>>; +/// Enumeration of supported aggregate functions pub enum AggregateType { Min, Max, @@ -42,7 +45,7 @@ pub enum AggregateType { } /// Runtime expression -pub enum RuntimeExpr { +pub(super) enum RuntimeExpr { Compiled { name: String, f: CompiledExpr, @@ -82,7 +85,7 @@ impl RuntimeExpr { } /// Compiles a scalar expression into a closure -pub fn compile_expr( +pub(super) fn compile_expr( ctx: &ExecutionContext, expr: &Expr, input_schema: &Schema, @@ -250,7 +253,7 @@ macro_rules! literal_array { } /// Compiles a scalar expression into a closure -pub fn compile_scalar_expr( +pub(super) fn compile_scalar_expr( ctx: &ExecutionContext, expr: &Expr, input_schema: &Schema, diff --git a/rust/datafusion/src/execution/filter.rs b/rust/datafusion/src/execution/filter.rs index 6ae01a7..5100f26 100644 --- a/rust/datafusion/src/execution/filter.rs +++ b/rust/datafusion/src/execution/filter.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Execution of a filter (predicate) +//! Execution of a filter (predicate) relation. The SQL clause `WHERE expr` represents a filter. use std::cell::RefCell; use std::rc::Rc; @@ -30,9 +30,13 @@ use super::error::{ExecutionError, Result}; use super::expression::RuntimeExpr; use super::relation::Relation; -pub struct FilterRelation { +/// Implementation of a filter relation +pub(super) struct FilterRelation { + /// The schema for the filter relation. This is always the same as the schema of the input relation. schema: Arc<Schema>, + /// Relation that is being filtered input: Rc<RefCell<Relation>>, + /// Filter expression expr: RuntimeExpr, } diff --git a/rust/datafusion/src/execution/limit.rs b/rust/datafusion/src/execution/limit.rs index c58e4fd..80d70fe 100644 --- a/rust/datafusion/src/execution/limit.rs +++ b/rust/datafusion/src/execution/limit.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Execution of a limit (predicate) +//! Limit relation, to limit the number of rows returned by a relation use std::cell::RefCell; use std::rc::Rc; @@ -29,10 +29,15 @@ use arrow::record_batch::RecordBatch; use super::error::{ExecutionError, Result}; use super::relation::Relation; -pub struct LimitRelation { +/// Implementation of a LIMIT relation +pub(super) struct LimitRelation { + /// The relation which the limit is being applied to input: Rc<RefCell<Relation>>, + /// The schema for the limit relation, which is always the same as the schema of the input relation schema: Arc<Schema>, + /// The number of rows returned by this relation limit: usize, + /// The number of rows that have been returned so far num_consumed_rows: usize, } diff --git a/rust/datafusion/src/execution/mod.rs b/rust/datafusion/src/execution/mod.rs index f775c16..0ece1e9 100644 --- a/rust/datafusion/src/execution/mod.rs +++ b/rust/datafusion/src/execution/mod.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. +//! DataFusion query execution + pub mod aggregate; pub mod context; pub mod error; pub mod expression; pub mod filter; pub mod limit; -pub mod physicalplan; pub mod projection; pub mod relation; diff --git a/rust/datafusion/src/execution/physicalplan.rs b/rust/datafusion/src/execution/physicalplan.rs deleted file mode 100644 index 23aa431..0000000 --- a/rust/datafusion/src/execution/physicalplan.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::super::logicalplan::LogicalPlan; -use std::rc::Rc; - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub enum PhysicalPlan { - /// Run a query and return the results to the client - Interactive { - plan: Rc<LogicalPlan>, - }, - /// Execute a logical plan and write the output to a file - Write { - plan: Rc<LogicalPlan>, - filename: String, - kind: String, - }, - Show { - plan: Rc<LogicalPlan>, - count: usize, - }, -} diff --git a/rust/datafusion/src/execution/projection.rs b/rust/datafusion/src/execution/projection.rs index 9b61395..5c95c65 100644 --- a/rust/datafusion/src/execution/projection.rs +++ b/rust/datafusion/src/execution/projection.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -//! Execution of a projection +//! Defines the projection relation. A projection determines which columns or expressions are +//! returned from a query. The SQL statement `SELECT a, b, a+b FROM t1` is an example of a +//! projection on table `t1` where the expressions `a`, `b`, and `a+b` are the projection +//! expressions. use std::cell::RefCell; use std::rc::Rc; @@ -29,9 +32,13 @@ use crate::execution::error::Result; use crate::execution::expression::RuntimeExpr; use crate::execution::relation::Relation; -pub struct ProjectRelation { +/// Projection relation +pub(super) struct ProjectRelation { + /// Schema for the result of the projection schema: Arc<Schema>, + /// The relation that the projection is being applied to input: Rc<RefCell<Relation>>, + /// Projection expressions expr: Vec<RuntimeExpr>, } diff --git a/rust/datafusion/src/execution/relation.rs b/rust/datafusion/src/execution/relation.rs index 0011863..ec6276c 100644 --- a/rust/datafusion/src/execution/relation.rs +++ b/rust/datafusion/src/execution/relation.rs @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +//! A relation is a representation of a set of tuples. A database table is a +//! type of relation. During query execution, each operation on a relation (such as projection, +//! selection, aggregation) results in a new relation. + use std::sync::{Arc, Mutex}; use arrow::datatypes::Schema; @@ -23,8 +27,8 @@ use arrow::record_batch::RecordBatch; use crate::datasource::RecordBatchIterator; use crate::execution::error::Result; -/// trait for all relations (a relation is essentially just an iterator over rows with -/// a known schema) +/// trait for all relations (a relation is essentially just an iterator over batches +/// of data, with a known schema) pub trait Relation { fn next(&mut self) -> Result<Option<RecordBatch>>; @@ -32,7 +36,8 @@ pub trait Relation { fn schema(&self) -> &Arc<Schema>; } -pub struct DataSourceRelation { +/// Implementation of a relation that represents a DataFusion data source +pub(super) struct DataSourceRelation { schema: Arc<Schema>, ds: Arc<Mutex<RecordBatchIterator>>, } diff --git a/rust/datafusion/src/logicalplan.rs b/rust/datafusion/src/logicalplan.rs index b4f4b27..94ebb28 100644 --- a/rust/datafusion/src/logicalplan.rs +++ b/rust/datafusion/src/logicalplan.rs @@ -23,17 +23,23 @@ use std::sync::Arc; use arrow::datatypes::*; +/// Enumeration of supported function types (Scalar and Aggregate) #[derive(Serialize, Deserialize, Debug, Clone)] pub enum FunctionType { Scalar, Aggregate, } +/// Logical representation of a UDF #[derive(Debug, Clone)] pub struct FunctionMeta { + /// Function name name: String, + /// Function arguments args: Vec<Field>, + /// Function return type return_type: DataType, + /// Function type (Scalar or Aggregate) function_type: FunctionType, } @@ -133,7 +139,7 @@ impl ScalarValue { } } -/// Relation Expression +/// Relation expression #[derive(Serialize, Deserialize, Clone, PartialEq)] pub enum Expr { /// index into a value within the row or complex value diff --git a/rust/datafusion/src/optimizer/mod.rs b/rust/datafusion/src/optimizer/mod.rs index 3e50328..27b853f 100644 --- a/rust/datafusion/src/optimizer/mod.rs +++ b/rust/datafusion/src/optimizer/mod.rs @@ -15,5 +15,7 @@ // specific language governing permissions and limitations // under the License. +//! Query optimizer module + pub mod optimizer; pub mod projection_push_down; diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index a463b4c..934ba2b 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -17,7 +17,6 @@ //! SQL Query Planner (produces logical plan from SQL AST) -use std::collections::HashSet; use std::string::String; use std::sync::Arc; @@ -28,6 +27,8 @@ use arrow::datatypes::*; use sqlparser::sqlast::*; +/// The SchemaProvider trait allows the query planner to obtain meta-data about tables and +/// functions referenced in SQL statements pub trait SchemaProvider { fn get_table_meta(&self, name: &str) -> Option<Arc<Schema>>; fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>>; @@ -396,6 +397,8 @@ pub fn convert_data_type(sql: &SQLType) -> Result<DataType> { } } +/// Derive field meta-data for an expression, for use in creating schemas that result from +/// evaluating expressions against an input schema. pub fn expr_to_field(e: &Expr, input_schema: &Schema) -> Field { match e { Expr::Column(i) => input_schema.fields()[*i].clone(), @@ -428,91 +431,14 @@ pub fn expr_to_field(e: &Expr, input_schema: &Schema) -> Field { } } +/// Derive field meta-data for a list of expressions, for use in creating schemas that result from +/// evaluating expressions against an input schema. pub fn exprlist_to_fields(expr: &Vec<Expr>, input_schema: &Schema) -> Vec<Field> { expr.iter() .map(|e| expr_to_field(e, input_schema)) .collect() } -fn collect_expr(e: &Expr, accum: &mut HashSet<usize>) { - match e { - Expr::Column(i) => { - accum.insert(*i); - } - Expr::Cast { ref expr, .. } => collect_expr(expr, accum), - Expr::Literal(_) => {} - Expr::IsNotNull(ref expr) => collect_expr(expr, accum), - Expr::IsNull(ref expr) => collect_expr(expr, accum), - Expr::BinaryExpr { - ref left, - ref right, - .. - } => { - collect_expr(left, accum); - collect_expr(right, accum); - } - Expr::AggregateFunction { ref args, .. } => { - args.iter().for_each(|e| collect_expr(e, accum)); - } - Expr::ScalarFunction { ref args, .. } => { - args.iter().for_each(|e| collect_expr(e, accum)); - } - Expr::Sort { ref expr, .. } => collect_expr(expr, accum), - } -} - -pub fn push_down_projection( - plan: &Arc<LogicalPlan>, - projection: &HashSet<usize>, -) -> Arc<LogicalPlan> { - //println!("push_down_projection() projection={:?}", projection); - match plan.as_ref() { - LogicalPlan::Aggregate { - ref input, - ref group_expr, - ref aggr_expr, - ref schema, - } => { - //TODO: apply projection first - let mut accum: HashSet<usize> = HashSet::new(); - group_expr.iter().for_each(|e| collect_expr(e, &mut accum)); - aggr_expr.iter().for_each(|e| collect_expr(e, &mut accum)); - Arc::new(LogicalPlan::Aggregate { - input: push_down_projection(&input, &accum), - group_expr: group_expr.clone(), - aggr_expr: aggr_expr.clone(), - schema: schema.clone(), - }) - } - LogicalPlan::Selection { - ref expr, - ref input, - } => { - let mut accum: HashSet<usize> = projection.clone(); - collect_expr(expr, &mut accum); - Arc::new(LogicalPlan::Selection { - expr: expr.clone(), - input: push_down_projection(&input, &accum), - }) - } - LogicalPlan::TableScan { - ref schema_name, - ref table_name, - ref schema, - .. - } => Arc::new(LogicalPlan::TableScan { - schema_name: schema_name.to_string(), - table_name: table_name.to_string(), - schema: schema.clone(), - projection: Some(projection.iter().cloned().collect()), - }), - LogicalPlan::Projection { .. } => plan.clone(), - LogicalPlan::Sort { .. } => plan.clone(), - LogicalPlan::Limit { .. } => plan.clone(), - LogicalPlan::EmptyRelation { .. } => plan.clone(), - } -} - #[cfg(test)] mod tests { @@ -640,28 +566,6 @@ mod tests { quick_test(sql, expected); } - #[test] - fn test_collect_expr() { - let mut accum: HashSet<usize> = HashSet::new(); - collect_expr( - &Expr::Cast { - expr: Arc::new(Expr::Column(3)), - data_type: DataType::Float64, - }, - &mut accum, - ); - collect_expr( - &Expr::Cast { - expr: Arc::new(Expr::Column(3)), - data_type: DataType::Float64, - }, - &mut accum, - ); - println!("accum: {:?}", accum); - assert_eq!(1, accum.len()); - assert!(accum.contains(&3)); - } - /// Create logical plan, write with formatter, compare to expected output fn quick_test(sql: &str, expected: &str) { use sqlparser::dialect::*;