This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new f98de24 ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry. f98de24 is described below commit f98de241cb04deab3b9f8281638f79b9b57a736a Author: Jorge C. Leitao <jorgecarlei...@gmail.com> AuthorDate: Sat Aug 22 11:05:40 2020 -0600 ARROW-9815 [Rust] [DataFusion] Fixed deadlock caused by accessing the scalar functions' registry. @andygrove and @alamb , I have no formal training in thread and mutex management, so I am not certain about this proposal or the following explanation: My understanding is that because the result of ``` ctx_state .lock() .expect("failed to lock mutex") .scalar_functions .lock() .expect("failed to lock mutex") .get(name) ``` is of temporary lifetime, using this in `match` blocks any access to `scalar_functions` until we leave the match, which deadlocks when we recursively call the function. Here I just cloned `.scalar_functions` so that we allow the lock to be released. I may also be dead wrong on every word that I wrote above. This does work, but if you could validate my reasoning above, I would appreciate very much! Note that we are also doing the same for `.datasources` in this file, which I suspect will also deadlock if when we have a plan with two sources. I did not touch that as I do not know the idiom/pattern to address this (locking within recursions). An alternative solution for this is to not make `PhysicalPlanner::create_physical_plan` recursive, and instead call a recursive function (with all the current logic of `create_physical_plan`) with references to `datasources` and `scalar_functions`, so that they can be used recursively (and we do not have to lock on every recursion. Closes #8018 from jorgecarleitao/fix_deadlock Authored-by: Jorge C. Leitao <jorgecarlei...@gmail.com> Signed-off-by: Andy Grove <andygrov...@gmail.com> --- rust/datafusion/src/execution/context.rs | 68 +++++-------- rust/datafusion/src/execution/physical_plan/mod.rs | 2 +- .../src/execution/physical_plan/planner.rs | 111 ++++++--------------- rust/datafusion/src/optimizer/type_coercion.rs | 25 ++--- rust/datafusion/src/sql/planner.rs | 10 +- rust/datafusion/tests/sql.rs | 13 +++ 6 files changed, 85 insertions(+), 144 deletions(-) diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index dac3215..6a0b150 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -106,8 +106,8 @@ impl ExecutionContext { pub fn with_config(config: ExecutionConfig) -> Self { let mut ctx = Self { state: Arc::new(Mutex::new(ExecutionContextState { - datasources: Arc::new(Mutex::new(HashMap::new())), - scalar_functions: Arc::new(Mutex::new(HashMap::new())), + datasources: Box::new(HashMap::new()), + scalar_functions: Box::new(HashMap::new()), config, })), }; @@ -191,22 +191,18 @@ impl ExecutionContext { // create a query planner let state = self.state.lock().expect("failed to lock mutex"); - let query_planner = SqlToRel::new(state.clone()); + let query_planner = SqlToRel::new(&*state); Ok(query_planner.statement_to_plan(&statements[0])?) } /// Register a scalar UDF pub fn register_udf(&mut self, f: ScalarFunction) { - let state = self.state.lock().expect("failed to lock mutex"); - state - .scalar_functions - .lock() - .expect("failed to lock mutex") - .insert(f.name.clone(), Box::new(f)); + let mut state = self.state.lock().expect("failed to lock mutex"); + state.scalar_functions.insert(f.name.clone(), Box::new(f)); } /// Get a reference to the registered scalar functions - pub fn scalar_functions(&self) -> Arc<Mutex<HashMap<String, Box<ScalarFunction>>>> { + pub fn scalar_functions(&self) -> Box<HashMap<String, Box<ScalarFunction>>> { self.state .lock() .expect("failed to lock mutex") @@ -281,12 +277,8 @@ impl ExecutionContext { name: &str, provider: Box<dyn TableProvider + Send + Sync>, ) { - let state = self.state.lock().expect("failed to lock mutex"); - state - .datasources - .lock() - .expect("failed to lock mutex") - .insert(name.to_string(), provider); + let mut ctx_state = self.state.lock().expect("failed to lock mutex"); + ctx_state.datasources.insert(name.to_string(), provider); } /// Retrieves a DataFrame representing a table previously registered by calling the @@ -298,8 +290,6 @@ impl ExecutionContext { .lock() .expect("failed to lock mutex") .datasources - .lock() - .expect("failed to lock mutex") .get(table_name) { Some(provider) => { @@ -329,8 +319,6 @@ impl ExecutionContext { .lock() .expect("failed to lock mutex") .datasources - .lock() - .expect("failed to lock mutex") .keys() .cloned() .collect() @@ -346,7 +334,7 @@ impl ExecutionContext { .lock() .expect("failed to lock mutex") .scalar_functions - .clone(), + .as_ref(), )), ]; let mut plan = plan.clone(); @@ -366,7 +354,8 @@ impl ExecutionContext { Some(planner) => planner, None => Arc::new(DefaultPhysicalPlanner::default()), }; - planner.create_physical_plan(logical_plan, self.state.clone()) + let ctx_state = self.state.lock().expect("Failed to aquire lock"); + planner.create_physical_plan(logical_plan, &ctx_state) } /// Execute a physical plan and collect the results in memory @@ -495,38 +484,29 @@ impl ExecutionConfig { } /// Execution context for registering data sources and executing queries -#[derive(Clone)] pub struct ExecutionContextState { /// Data sources that are registered with the context - pub datasources: Arc<Mutex<HashMap<String, Box<dyn TableProvider + Send + Sync>>>>, + pub datasources: Box<HashMap<String, Box<dyn TableProvider + Send + Sync>>>, /// Scalar functions that are registered with the context - pub scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>, + pub scalar_functions: Box<HashMap<String, Box<ScalarFunction>>>, /// Context configuration pub config: ExecutionConfig, } impl SchemaProvider for ExecutionContextState { fn get_table_meta(&self, name: &str) -> Option<SchemaRef> { - self.datasources - .lock() - .expect("failed to lock mutex") - .get(name) - .map(|ds| ds.schema().clone()) + self.datasources.get(name).map(|ds| ds.schema().clone()) } fn get_function_meta(&self, name: &str) -> Option<Arc<FunctionMeta>> { - self.scalar_functions - .lock() - .expect("failed to lock mutex") - .get(name) - .map(|f| { - Arc::new(FunctionMeta::new( - name.to_owned(), - f.args.clone(), - f.return_type.clone(), - FunctionType::Scalar, - )) - }) + self.scalar_functions.get(name).map(|f| { + Arc::new(FunctionMeta::new( + name.to_owned(), + f.args.clone(), + f.return_type.clone(), + FunctionType::Scalar, + )) + }) } } @@ -641,8 +621,6 @@ mod tests { .lock() .expect("failed to lock mutex") .datasources - .lock() - .expect("failed to lock mutex") .get("test") .unwrap() .schema(); @@ -1127,7 +1105,7 @@ mod tests { fn create_physical_plan( &self, _logical_plan: &LogicalPlan, - _ctx_state: Arc<Mutex<ExecutionContextState>>, + _ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>> { Err(ExecutionError::NotImplemented( "query not supported".to_string(), diff --git a/rust/datafusion/src/execution/physical_plan/mod.rs b/rust/datafusion/src/execution/physical_plan/mod.rs index 191d256..817b6ab 100644 --- a/rust/datafusion/src/execution/physical_plan/mod.rs +++ b/rust/datafusion/src/execution/physical_plan/mod.rs @@ -40,7 +40,7 @@ pub trait PhysicalPlanner { fn create_physical_plan( &self, logical_plan: &LogicalPlan, - ctx_state: Arc<Mutex<ExecutionContextState>>, + ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>>; } diff --git a/rust/datafusion/src/execution/physical_plan/planner.rs b/rust/datafusion/src/execution/physical_plan/planner.rs index ddb0789..2e83cd0 100644 --- a/rust/datafusion/src/execution/physical_plan/planner.rs +++ b/rust/datafusion/src/execution/physical_plan/planner.rs @@ -17,7 +17,7 @@ //! Physical query planner -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use crate::error::{ExecutionError, Result}; use crate::execution::context::ExecutionContextState; @@ -59,27 +59,16 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { fn create_physical_plan( &self, logical_plan: &LogicalPlan, - ctx_state: Arc<Mutex<ExecutionContextState>>, + ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn ExecutionPlan>> { - let batch_size = ctx_state - .lock() - .expect("failed to lock mutex") - .config - .batch_size; + let batch_size = ctx_state.config.batch_size; match logical_plan { LogicalPlan::TableScan { table_name, projection, .. - } => match ctx_state - .lock() - .expect("failed to lock mutex") - .datasources - .lock() - .expect("failed to lock mutex") - .get(table_name) - { + } => match ctx_state.datasources.get(table_name) { Some(provider) => { let partitions = provider.scan(projection, batch_size)?; if partitions.is_empty() { @@ -139,17 +128,13 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { batch_size, )?)), LogicalPlan::Projection { input, expr, .. } => { - let input = self.create_physical_plan(input, ctx_state.clone())?; + let input = self.create_physical_plan(input, ctx_state)?; let input_schema = input.as_ref().schema().clone(); let runtime_expr = expr .iter() .map(|e| { tuple_err(( - self.create_physical_expr( - e, - &input_schema, - ctx_state.clone(), - ), + self.create_physical_expr(e, &input_schema, &ctx_state), e.name(&input_schema), )) }) @@ -163,18 +148,14 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .. } => { // Initially need to perform the aggregate and then merge the partitions - let input = self.create_physical_plan(input, ctx_state.clone())?; + let input = self.create_physical_plan(input, ctx_state)?; let input_schema = input.as_ref().schema().clone(); let groups = group_expr .iter() .map(|e| { tuple_err(( - self.create_physical_expr( - e, - &input_schema, - ctx_state.clone(), - ), + self.create_physical_expr(e, &input_schema, ctx_state), e.name(&input_schema), )) }) @@ -183,11 +164,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .iter() .map(|e| { tuple_err(( - self.create_aggregate_expr( - e, - &input_schema, - ctx_state.clone(), - ), + self.create_aggregate_expr(e, &input_schema, ctx_state), e.name(&input_schema), )) }) @@ -209,11 +186,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { let merge = Arc::new(MergeExec::new( schema.clone(), partitions, - ctx_state - .lock() - .expect("failed to lock mutex") - .config - .concurrency, + ctx_state.config.concurrency, )); // construct the expressions for the final aggregation @@ -241,17 +214,14 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { LogicalPlan::Filter { input, predicate, .. } => { - let input = self.create_physical_plan(input, ctx_state.clone())?; + let input = self.create_physical_plan(input, ctx_state)?; let input_schema = input.as_ref().schema().clone(); - let runtime_expr = self.create_physical_expr( - predicate, - &input_schema, - ctx_state.clone(), - )?; + let runtime_expr = + self.create_physical_expr(predicate, &input_schema, ctx_state)?; Ok(Arc::new(FilterExec::try_new(runtime_expr, input)?)) } LogicalPlan::Sort { expr, input, .. } => { - let input = self.create_physical_plan(input, ctx_state.clone())?; + let input = self.create_physical_plan(input, ctx_state)?; let input_schema = input.as_ref().schema().clone(); let sort_expr = expr @@ -268,7 +238,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { descending: !*asc, nulls_first: *nulls_first, }, - ctx_state.clone(), + ctx_state, ), _ => Err(ExecutionError::ExecutionError( "Sort only accepts sort expressions".to_string(), @@ -279,26 +249,18 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { Ok(Arc::new(SortExec::try_new( sort_expr, input, - ctx_state - .lock() - .expect("failed to lock mutex") - .config - .concurrency, + ctx_state.config.concurrency, )?)) } LogicalPlan::Limit { input, n, .. } => { - let input = self.create_physical_plan(input, ctx_state.clone())?; + let input = self.create_physical_plan(input, ctx_state)?; let input_schema = input.as_ref().schema().clone(); Ok(Arc::new(GlobalLimitExec::new( input_schema.clone(), input.partitions()?, *n, - ctx_state - .lock() - .expect("failed to lock mutex") - .config - .concurrency, + ctx_state.config.concurrency, ))) } LogicalPlan::Explain { @@ -307,7 +269,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { stringified_plans, schema, } => { - let input = self.create_physical_plan(plan, ctx_state.clone())?; + let input = self.create_physical_plan(plan, ctx_state)?; let mut stringified_plans = stringified_plans .iter() @@ -338,11 +300,11 @@ impl DefaultPhysicalPlanner { &self, e: &Expr, input_schema: &Schema, - ctx_state: Arc<Mutex<ExecutionContextState>>, + ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn PhysicalExpr>> { match e { Expr::Alias(expr, ..) => { - Ok(self.create_physical_expr(expr, input_schema, ctx_state.clone())?) + Ok(self.create_physical_expr(expr, input_schema, ctx_state)?) } Expr::Column(name) => { // check that name exists @@ -351,12 +313,12 @@ impl DefaultPhysicalPlanner { } Expr::Literal(value) => Ok(Arc::new(Literal::new(value.clone()))), Expr::BinaryExpr { left, op, right } => Ok(Arc::new(BinaryExpr::new( - self.create_physical_expr(left, input_schema, ctx_state.clone())?, + self.create_physical_expr(left, input_schema, ctx_state)?, op.clone(), - self.create_physical_expr(right, input_schema, ctx_state.clone())?, + self.create_physical_expr(right, input_schema, ctx_state)?, ))), Expr::Cast { expr, data_type } => Ok(Arc::new(CastExpr::try_new( - self.create_physical_expr(expr, input_schema, ctx_state.clone())?, + self.create_physical_expr(expr, input_schema, ctx_state)?, input_schema, data_type.clone(), )?)), @@ -364,14 +326,7 @@ impl DefaultPhysicalPlanner { name, args, return_type, - } => match ctx_state - .lock() - .expect("failed to lock mutex") - .scalar_functions - .lock() - .expect("failed to lock mutex") - .get(name) - { + } => match ctx_state.scalar_functions.get(name) { Some(f) => { let mut physical_args = vec![]; for e in args { @@ -405,7 +360,7 @@ impl DefaultPhysicalPlanner { &self, e: &Expr, input_schema: &Schema, - ctx_state: Arc<Mutex<ExecutionContextState>>, + ctx_state: &ExecutionContextState, ) -> Result<Arc<dyn AggregateExpr>> { match e { Expr::AggregateFunction { name, args, .. } => { @@ -413,27 +368,27 @@ impl DefaultPhysicalPlanner { "sum" => Ok(Arc::new(Sum::new(self.create_physical_expr( &args[0], input_schema, - ctx_state.clone(), + ctx_state, )?))), "avg" => Ok(Arc::new(Avg::new(self.create_physical_expr( &args[0], input_schema, - ctx_state.clone(), + ctx_state, )?))), "max" => Ok(Arc::new(Max::new(self.create_physical_expr( &args[0], input_schema, - ctx_state.clone(), + ctx_state, )?))), "min" => Ok(Arc::new(Min::new(self.create_physical_expr( &args[0], input_schema, - ctx_state.clone(), + ctx_state, )?))), "count" => Ok(Arc::new(Count::new(self.create_physical_expr( &args[0], input_schema, - ctx_state.clone(), + ctx_state, )?))), other => Err(ExecutionError::NotImplemented(format!( "Unsupported aggregate function '{}'", @@ -454,10 +409,10 @@ impl DefaultPhysicalPlanner { e: &Expr, input_schema: &Schema, options: SortOptions, - ctx_state: Arc<Mutex<ExecutionContextState>>, + ctx_state: &ExecutionContextState, ) -> Result<PhysicalSortExpr> { Ok(PhysicalSortExpr { - expr: self.create_physical_expr(e, input_schema, ctx_state.clone())?, + expr: self.create_physical_expr(e, input_schema, ctx_state)?, options: options, }) } diff --git a/rust/datafusion/src/optimizer/type_coercion.rs b/rust/datafusion/src/optimizer/type_coercion.rs index df4818c..22f3ef0 100644 --- a/rust/datafusion/src/optimizer/type_coercion.rs +++ b/rust/datafusion/src/optimizer/type_coercion.rs @@ -21,7 +21,7 @@ //! float)`. This keeps the runtime query execution code much simpler. use std::collections::HashMap; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use arrow::datatypes::Schema; @@ -37,16 +37,16 @@ use utils::optimize_explain; /// /// This optimizer does not alter the structure of the plan, it only changes expressions on it. pub struct TypeCoercionRule { - scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>, + scalar_functions: Arc<HashMap<String, Box<ScalarFunction>>>, } impl TypeCoercionRule { /// Create a new type coercion optimizer rule using meta-data about registered /// scalar functions - pub fn new( - scalar_functions: Arc<Mutex<HashMap<String, Box<ScalarFunction>>>>, - ) -> Self { - Self { scalar_functions } + pub fn new(scalar_functions: &HashMap<String, Box<ScalarFunction>>) -> Self { + Self { + scalar_functions: Arc::new(scalar_functions.clone()), + } } /// Rewrite an expression to include explicit CAST operations when required @@ -73,12 +73,7 @@ impl TypeCoercionRule { } Expr::ScalarFunction { name, .. } => { // cast the inputs of scalar functions to the appropriate type where possible - match self - .scalar_functions - .lock() - .expect("failed to lock mutex") - .get(name) - { + match self.scalar_functions.get(name) { Some(func_meta) => { for i in 0..expressions.len() { let field = &func_meta.args[i]; @@ -172,7 +167,7 @@ mod tests { .build()?; let scalar_functions = HashMap::new(); - let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions))); + let mut rule = TypeCoercionRule::new(&scalar_functions); let plan = rule.optimize(&plan)?; // check that the filter had a cast added @@ -199,7 +194,7 @@ mod tests { .build()?; let scalar_functions = HashMap::new(); - let mut rule = TypeCoercionRule::new(Arc::new(Mutex::new(scalar_functions))); + let mut rule = TypeCoercionRule::new(&scalar_functions); let plan = rule.optimize(&plan)?; assert!(format!("{:?}", plan).starts_with("Filter: CAST(#c7 AS Float64) Lt #c12")); @@ -276,7 +271,7 @@ mod tests { }; let ctx = ExecutionContext::new(); - let rule = TypeCoercionRule::new(ctx.scalar_functions()); + let rule = TypeCoercionRule::new(ctx.scalar_functions().as_ref()); let expr2 = rule.rewrite_expr(&expr, &schema).unwrap(); diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index c66d238..1735710 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -47,13 +47,13 @@ pub trait SchemaProvider { } /// SQL query planner -pub struct SqlToRel<S: SchemaProvider> { - schema_provider: S, +pub struct SqlToRel<'a, S: SchemaProvider> { + schema_provider: &'a S, } -impl<S: SchemaProvider> SqlToRel<S> { +impl<'a, S: SchemaProvider> SqlToRel<'a, S> { /// Create a new query planner - pub fn new(schema_provider: S) -> Self { + pub fn new(schema_provider: &'a S) -> Self { SqlToRel { schema_provider } } @@ -854,7 +854,7 @@ mod tests { } fn logical_plan(sql: &str) -> Result<LogicalPlan> { - let planner = SqlToRel::new(MockSchemaProvider {}); + let planner = SqlToRel::new(&MockSchemaProvider {}); let ast = DFParser::parse_sql(&sql).unwrap(); planner.statement_to_plan(&ast[0]) } diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 4ae90cd..942a781 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -201,6 +201,19 @@ fn csv_query_avg_sqrt() -> Result<()> { Ok(()) } +// this query used to deadlock due to the call udf(udf()) +#[test] +fn csv_query_sqrt_sqrt() -> Result<()> { + let mut ctx = create_ctx()?; + register_aggregate_csv(&mut ctx)?; + let sql = "SELECT sqrt(sqrt(c12)) FROM aggregate_test_100 LIMIT 1"; + let actual = execute(&mut ctx, sql); + // sqrt(sqrt(c12=0.9294097332465232)) = 0.9818650561397431 + let expected = "0.9818650561397431".to_string(); + assert_eq!(actual.join("\n"), expected); + Ok(()) +} + fn create_ctx() -> Result<ExecutionContext> { let mut ctx = ExecutionContext::new();