alamb commented on a change in pull request #8097: URL: https://github.com/apache/arrow/pull/8097#discussion_r484891048
########## File path: rust/datafusion/src/execution/context.rs ########## @@ -383,24 +383,56 @@ impl ScalarFunctionRegistry for ExecutionContext { } } +/// A planner used to add extensions to DataFusion logical and phusical plans. Review comment: FYI @andygrove here is the single trait you suggested (though I renamed it `QueryPlanner` rather than `UserDefinedQueryPlanner` ########## File path: rust/datafusion/src/physical_plan/planner.rs ########## @@ -331,6 +355,16 @@ impl DefaultPhysicalPlanner { let schema_ref = Arc::new(schema.as_ref().clone()); Ok(Arc::new(ExplainExec::new(schema_ref, stringified_plans))) } + LogicalPlan::Extension { node } => { + let inputs = node + .inputs() + .into_iter() + .map(|input_plan| self.create_physical_plan(input_plan, ctx_state)) + .collect::<Result<Vec<_>>>()?; + + self.extension_planner Review comment: I like this idea so much I filed a ticket to make it happen: https://issues.apache.org/jira/browse/ARROW-9940. I'll try and look into it later this week (given I am effectively starting to create my own extensions now for an internal project at work, that will be a good test case). ########## File path: rust/datafusion/src/execution/context.rs ########## @@ -288,9 +288,17 @@ impl ExecutionContext { /// Optimize the logical plan by applying optimizer rules pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> { - let plan = ProjectionPushDown::new().optimize(&plan)?; - let plan = FilterPushDown::new().optimize(&plan)?; - let plan = TypeCoercionRule::new(self).optimize(&plan)?; + let mut plan = ProjectionPushDown::new().optimize(&plan)?; + plan = FilterPushDown::new().optimize(&plan)?; + plan = TypeCoercionRule::new(self).optimize(&plan)?; + + // apply any user supplied rules + let mut rules = self.state.config.optimizer_rule_source.rules(); Review comment: I like this approach -- I will give it a try ########## File path: rust/datafusion/src/execution/context.rs ########## @@ -417,7 +445,16 @@ impl ExecutionConfig { mut self, physical_planner: Arc<dyn PhysicalPlanner>, ) -> Self { - self.physical_planner = Some(physical_planner); + self.physical_planner = physical_planner; + self + } + + /// Optional source of additional optimization passes + pub fn with_optimizer_rule_source( Review comment: As it turns out that @andygrove 's suggestion for a single planner trait has removed the need for this trait entirely 🎉 ########## File path: rust/datafusion/src/execution/context.rs ########## @@ -375,15 +380,39 @@ impl ScalarFunctionRegistry for ExecutionContext { } } +/// Provides OptimizerRule instances to Review comment: As it turns out your suggestion for a single planner trait has removed the need for this trait entirely 🎉 ########## File path: rust/datafusion/src/logical_plan/mod.rs ########## @@ -755,8 +756,72 @@ impl fmt::Debug for Expr { } } -/// The LogicalPlan represents different types of relations (such as Projection, -/// Filter, etc) and can be created by the SQL query planner and the DataFrame API. +/// This defines the interface for `LogicalPlan` nodes that can be +/// used to extend DataFusion with custom relational operators. +/// +/// See the example in +/// [user_defined_plan.rs](../../tests/user_defined_plan.rs) for an +/// example of how to use this extenison API +pub trait ExtensionPlanNode: Debug { + /// Return a reference to self as Any, to support dynamic downcasting + fn as_any(&self) -> &dyn Any; + + /// Return the the logical plan's inputs + fn inputs(&self) -> Vec<&LogicalPlan>; Review comment: I think children is a better name. I filed ticket to make names consistent: https://issues.apache.org/jira/browse/ARROW-9939. I'll try and get a PR up later today or maybe tomorrow ########## File path: rust/datafusion/tests/user_defined_plan.rs ########## @@ -0,0 +1,510 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains an end to end demonstration of creating +//! a user defined operator in DataFusion. +//! +//! Specifically, it shows how to define a `TopKNode` that implements +//! `ExtensionPlanNode`, add an OptimizerRule to rewrite a +//! `LogicalPlan` to use that node a `LogicalPlan`, create an +//! `ExecutionPlan` and finally produce results. +//! +//! # TopK Background: +//! +//! A "Top K" node is a common query optimization which is used for +//! queries such as "find the top 3 customers by revenue". The +//! (simplified) SQL for such a query might be: +//! +//! ```sql +//! CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) +//! STORED AS CSV location 'tests/customer.csv'; +//! +//! SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +//! ``` +//! +//! And a naive plan would be: +//! +//! ``` +//! > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +//! +--------------+----------------------------------------+ +//! | plan_type | plan | +//! +--------------+----------------------------------------+ +//! | logical_plan | Limit: 3 | +//! | | Sort: #revenue DESC NULLS FIRST | +//! | | Projection: #customer_id, #revenue | +//! | | TableScan: sales projection=None | +//! +--------------+----------------------------------------+ +//! ``` +//! +//! While this plan produces the correct answer, the careful reader +//! will note it fully sorts the input before discarding everything +//! other than the top 3 elements. +//! +//! The same answer can be produced by simply keeping track of the top +//! N elements, reducing the total amount of required buffer memory. +//! + +use arrow::{ + array::{Int64Array, StringArray}, + datatypes::SchemaRef, + error::ArrowError, + record_batch::{RecordBatch, RecordBatchReader}, + util::pretty::pretty_format_batches, +}; +use datafusion::{ + error::{ExecutionError, Result}, + execution::context::ExecutionContextState, + execution::context::QueryPlanner, + logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode}, + optimizer::{optimizer::OptimizerRule, utils::optimize_explain}, + physical_plan::{ + planner::{DefaultPhysicalPlanner, ExtensionPlanner}, + Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, + }, + prelude::{ExecutionConfig, ExecutionContext}, +}; +use fmt::Debug; +use std::{ + any::Any, + collections::BTreeMap, + fmt, + sync::{Arc, Mutex}, +}; + +/// Execute the specified sql and return the resulting record batches +/// pretty printed as a String. +fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result<String> { + let df = ctx.sql(sql)?; + let batches = df.collect()?; + pretty_format_batches(&batches).map_err(|e| ExecutionError::ArrowError(e)) +} + +/// Create a test table. +fn setup_table(mut ctx: ExecutionContext) -> Result<ExecutionContext> { + let sql = "CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) STORED AS CSV location 'tests/customer.csv'"; + + let expected = vec!["++", "++"]; + + let s = exec_sql(&mut ctx, sql)?; + let actual = s.lines().collect::<Vec<_>>(); + + assert_eq!(expected, actual, "Creating table"); + Ok(ctx) +} + +const QUERY: &str = + "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3"; + +// Run the query using the specified execution context and compare it +// to the known result +fn run_and_compare_query(mut ctx: ExecutionContext, description: &str) -> Result<()> { + let expected = vec![ + "+-------------+---------+", + "| customer_id | revenue |", + "+-------------+---------+", + "| paul | 300 |", + "| jorge | 200 |", + "| andy | 150 |", + "+-------------+---------+", + ]; + + let s = exec_sql(&mut ctx, QUERY)?; + let actual = s.lines().collect::<Vec<_>>(); + + assert_eq!( + expected, + actual, + "output mismatch for {}. Expectedn\n{}Actual:\n{}", + description, + expected.join("\n"), + s + ); + Ok(()) +} + +#[test] +// Run the query using default planners and optimizer +fn normal_query() -> Result<()> { + let ctx = setup_table(ExecutionContext::new())?; + run_and_compare_query(ctx, "Default context") +} + +#[test] +// Run the query using topk optimization +fn topk_query() -> Result<()> { + // Note the only difference is that the top + let ctx = setup_table(make_topk_context())?; + run_and_compare_query(ctx, "Topk context") +} + +#[test] +// Run EXPLAIN PLAN and show the plan was in fact rewritten +fn topk_plan() -> Result<()> { + let mut ctx = setup_table(make_topk_context())?; + + let expected = vec![ + "| logical_plan after topk | TopK: k=3 |", + "| | Projection: #customer_id, #revenue |", + "| | TableScan: sales projection=Some([0, 1]) |", + ].join("\n"); + + let explain_query = format!("EXPLAIN VERBOSE {}", QUERY); + let actual_output = exec_sql(&mut ctx, &explain_query)?; + + // normalize newlines (output on windows uses \r\n) + let actual_output = actual_output.replace("\r\n", "\n"); + + assert!(actual_output.contains(&expected) , "Expected output not present in actual output\nExpected:\n---------\n{}\nActual:\n--------\n{}", expected, actual_output); + Ok(()) +} + +fn make_topk_context() -> ExecutionContext { + let config = ExecutionConfig::new().with_query_planner(Arc::new(TopKQueryPlanner {})); + + ExecutionContext::with_config(config) +} + +// ------ The implementation of the TopK code follows ----- + +struct TopKQueryPlanner {} Review comment: Here is the updated way to register extensions with DataFusion ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org