alamb opened a new issue, #17718: URL: https://github.com/apache/datafusion/issues/17718
## Is your feature request related to a problem or challenge? Please describe what you are trying to do. This ticket is to coordinate discussion and ideas for improving DataFusion join access path (aka algorithm) and join order selection. DataFusion does not currently have a sophisticated cost based optimizer, but has support for the mechanics of join reordering and statistics. There seems to be broad desire for improve DataFusion for more advanced use cases (e.g. TPC-DS / distributed joins / ASOF joins). However as described below there are many tradeoffs involved with join access path and join order selection (e.g. https://github.com/apache/datafusion/issues/17432 recently) , so it is not likely any one implementation will be good for all users Instead, I would like to discuss adding an API that users can use to customize the behavior to their use case. ## Current State of DataFusion Optimizer: Quoting from ["Optimizing SQL (and DataFrames) in DataFusion, Part 2: Optimizers in Apache DataFusion"](https://datafusion.apache.org/blog/2025/06/15/optimizing-sql-dataframes-part-two/#access-path-and-join-order-selection) DataFusion purposely does not include a sophisticated cost based optimizer. Instead, keeping with its [design goals] it provides a reasonable default implementation along with extension points to customize behavior. [design goals]: https://docs.rs/datafusion/latest/datafusion/#design-goals Specifically, DataFusion includes 1. “Syntactic Optimizer” (joins in the order they are listed in the query) with basic join re-ordering ([source](https://github.com/apache/datafusion/blob/main/datafusion/physical-optimizer/src/join_selection.rs)) to prevent join disasters. 2. Support for [ColumnStatistics](https://docs.rs/datafusion/latest/datafusion/common/struct.ColumnStatistics.html) and [Table Statistics](https://docs.rs/datafusion/latest/datafusion/common/struct.Statistics.html) 3. The framework for [filter selectivity](https://docs.rs/datafusion/latest/datafusion/physical_expr/struct.AnalysisContext.html#structfield.selectivity) + join cardinality estimation. 4. APIs for easily rewriting plans, such as the [TreeNode API](https://docs.rs/datafusion/latest/datafusion/common/tree_node/trait.TreeNode.html#overview) and [reordering joins](https://docs.rs/datafusion/latest/datafusion/physical_plan/joins/struct.HashJoinExec.html#method.swap_inputs) This combination of features along with [custom optimizer passes] lets users customize the behavior to their use case, such as custom indexes like [uWheel] and [materialized views]. [custom optimizer passes]: https://docs.rs/datafusion/latest/datafusion/execution/session_state/struct.SessionStateBuilder.html#method.with_physical_optimizer_rule [uWheel]: https://uwheel.rs/post/datafusion_uwheel/ [materialized views]: https://github.com/datafusion-contrib/datafusion-materialized-views The rationale for including only a basic optimizer is that any one particular set of heuristics and cost model is unlikely to work well for the wide variety of DataFusion users because of the tradeoffs involved. For example, some users may always have access to adequate resources, and want the fastest query execution, and are willing to tolerate runtime errors or a performance cliff when there is insufficient memory. Other users, however, may be willing to accept a slower maximum performance in return for more predictable performance when running in a resource constrained environment. This approach is not universally agreed. ## Usecases Here are some use cases that have been discussed in various issues and PRs: - Choose different join algorithms based on tradeoffs, such as memory usage vs CPU usage https://github.com/apache/datafusion/issues/17432 - Distributed join planning (e.g choosing between resegment or broadcast joins) where the join order might be different based on the distribution of the source data - Join optimization research, such as join reordering based on different statistics/ cost models (e.g. [optd](https://db.cs.cmu.edu/projects/optd/) from CMUDB) or different join algorithms - Add new join implementations such as `ASOF` / specialized temporal joins: https://github.com/apache/datafusion/issues/318 ## Describe the solution you'd like I would like 1. An API for join access path, algorithm, and join order selection that is flexible enough to support the use cases above, but can be extended /customized by users 3. Documentation and examples of how to use the API 4. A default implementation that works reasonably well (e.g. prevents join disasters in TPCH with basic statistics), and respects the same basic knobs we have today Example of existing [configuration settings](https://datafusion.apache.org/user-guide/configs.html) * repartition_joins * allow_symmetric_joins_without_pruning * prefer_hash_join * hash_join_single_partition_threshold * hash_join_single_partition_threshold_rows ## Describe alternatives you've considered One idea (from https://github.com/apache/datafusion/pull/17467#pullrequestreview-3201543429 a discussion with @2010YOUY01): Maybe we could make `JoinPlanner` trait that can be registered with the `SessionContext` or the Optimizer the same way as ExtensionPlanners?. Then we can provide a default JoinPlanner (what currently exists) that has its own config namespace, etc Something like this ```rust trait JoinPlanner { // plan the initial join when converting from Logical --> Physical join fn plan_initial_join( session_state: &SessionState, physical_left: Arc<dyn ExecutionPlan>, physical_right: Arc<dyn ExecutionPlan>, join_on: join_utils::JoinOn, join_filter: Option<join_utils::JoinFilter>, join_type: &JoinType, null_equality: &datafusion_common::NullEquality,) -> Arc<dyn ExecutionPlan>; ) // TODO other APIs here } ``` Another possibility is to introduce a `JoinGraph` structure (TODO tocket) with the relevant APIs for walking and building plans ```rust trait JoinPlanner { // plan a specific join graph when converting from Logical --> Physical join fn plan_join_graph( session_state: &SessionState, join_graph: JoinGraph, )-> Result<Arc<dyn ExecutionPlan>; } /// represents a join graph struct JoinGraph { ... } impl JoinGraph { fn new_from_plan(logical_plan: LogicalPlan) -> Result<Self> {..} ... } ``` ### Join Graphs This is from the [google AI summary](https://www.google.com/search?q=joingraph+data+structure+database+definition+%28not+graph+database) that is pretty good: A join graph in the context of relational databases is a data structure that represents the relationships and potential join operations between tables in a relational database schema. It is a conceptual tool used to understand and visualize how different tables can be combined to answer a query. Here's a breakdown of its definition: * [Vertices (Nodes): ](https://www.google.com/search?sca_esv=3cba3ff7c6207a53&cs=1&q=Vertices+%28Nodes%29)Each vertex in a join graph corresponds to a table (or relation) in the relational database schema. * [Edges: ](https://www.google.com/search?sca_esv=3cba3ff7c6207a53)An edge between two vertices (tables) signifies a potential join relationship between those tables. This relationship is typically based on common attributes (e.g., primary key-foreign key relationships) that allow data from the two tables to be combined. * [Edge Labels (Join Conditions): ](https://www.google.com/search?sca_esv=3cba3ff7c6207a53&cs=1&q=Edge+Labels+%28Join+Conditions%29)Edges are often labeled with the specific join conditions that define how the connected tables are to be joined. These conditions specify the attributes that must match for a successful join (e.g., TableA.ID = TableB.TableA_ID). ### Purpose: Join graphs are used to: * [Visualize Schema Relationships: ](https://www.google.com/search?sca_esv=3cba3ff7c6207a53&cs=1&q=Visualize+Schema+Relationships&sa=X)They provide a clear visual representation of how different tables in a database are connected. * [Plan Query Execution: ](https://www.google.com/search?sca_esv=3cba3ff7c6207a53&cs=1&q=Plan+Query+Execution)Database query optimizers can use join graphs to identify efficient ways to execute queries involving multiple joins. For example, TPCH Q3 ```sql select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_mktsegment = 'BUILDING' and c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < date '1995-03-15' and l_shipdate > date '1995-03-15' group by l_orderkey, o_orderdate, o_shippriority order by revenue desc, o_orderdate limit 10; ``` Can be represented by a JoinGRaph ```mermaid graph TD customer["customer"] orders["orders"] lineitem["lineitem"] customer -->|c_custkey = o_custkey| orders orders -->|o_orderkey = l_orderkey| lineitem ``` ## Additional context Related discussions - https://github.com/apache/datafusion/issues/3929 - https://github.com/apache/datafusion/issues/9846#issuecomment-2566568654 - https://github.com/apache/datafusion/issues/17432 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
