This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8482e2e448 feat: support extension planner for `TableScan` (#20548)
8482e2e448 is described below

commit 8482e2e448912249fbd71210c8c8c73051480dc0
Author: Heran Lin <[email protected]>
AuthorDate: Sun Mar 1 16:17:32 2026 +0800

    feat: support extension planner for `TableScan` (#20548)
    
    ## Which issue does this PR close?
    
    - Closes #20547.
    
    ## Rationale for this change
    
    Please refer to the issue for context. This PR serves as a
    proof-of-concept and we can consider merging it if we reach consensus on
    the design discussed in the issue.
    
    ## What changes are included in this PR?
    
    The trait method `ExtensionPlanner::plan_table_scan()` is added so that
    the user can define physical planning logic for custom table sources.
    
    ## Are these changes tested?
    
    The changes are accompanied with unit tests.
    
    ## Are there any user-facing changes?
    
    Yes, a new trait method is added to `ExtensionPlanner`. This is not a
    breaking change since the trait method has a default implementation.
    
    ---------
    
    Co-authored-by: Jax Liu <[email protected]>
---
 datafusion/core/src/physical_planner.rs | 258 +++++++++++++++++++++++++++-----
 1 file changed, 221 insertions(+), 37 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 828b286407..14f3e5cf03 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -69,8 +69,8 @@ use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
 };
 use datafusion_common::{
-    DFSchema, ScalarValue, exec_err, internal_datafusion_err, internal_err, 
not_impl_err,
-    plan_err,
+    DFSchema, DFSchemaRef, ScalarValue, exec_err, internal_datafusion_err, 
internal_err,
+    not_impl_err, plan_err,
 };
 use datafusion_common::{
     TableReference, assert_eq_or_internal_err, assert_or_internal_err,
@@ -157,6 +157,80 @@ pub trait ExtensionPlanner {
         physical_inputs: &[Arc<dyn ExecutionPlan>],
         session_state: &SessionState,
     ) -> Result<Option<Arc<dyn ExecutionPlan>>>;
+
+    /// Create a physical plan for a [`LogicalPlan::TableScan`].
+    ///
+    /// This is useful for planning valid [`TableSource`]s that are not 
[`TableProvider`]s.
+    ///
+    /// Returns:
+    /// * `Ok(Some(plan))` if the planner knows how to plan the `scan`
+    /// * `Ok(None)` if the planner does not know how to plan the `scan` and 
wants to delegate the planning to another [`ExtensionPlanner`]
+    /// * `Err` if the planner knows how to plan the `scan` but errors while 
doing so
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// use std::sync::Arc;
+    /// use datafusion::physical_plan::ExecutionPlan;
+    /// use datafusion::logical_expr::TableScan;
+    /// use datafusion::execution::context::SessionState;
+    /// use datafusion::error::Result;
+    /// use datafusion_physical_planner::{ExtensionPlanner, PhysicalPlanner};
+    /// use async_trait::async_trait;
+    ///
+    /// // Your custom table source type
+    /// struct MyCustomTableSource { /* ... */ }
+    ///
+    /// // Your custom execution plan
+    /// struct MyCustomExec { /* ... */ }
+    ///
+    /// struct MyExtensionPlanner;
+    ///
+    /// #[async_trait]
+    /// impl ExtensionPlanner for MyExtensionPlanner {
+    ///     async fn plan_extension(
+    ///         &self,
+    ///         _planner: &dyn PhysicalPlanner,
+    ///         _node: &dyn UserDefinedLogicalNode,
+    ///         _logical_inputs: &[&LogicalPlan],
+    ///         _physical_inputs: &[Arc<dyn ExecutionPlan>],
+    ///         _session_state: &SessionState,
+    ///     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    ///         Ok(None)
+    ///     }
+    ///
+    ///     async fn plan_table_scan(
+    ///         &self,
+    ///         _planner: &dyn PhysicalPlanner,
+    ///         scan: &TableScan,
+    ///         _session_state: &SessionState,
+    ///     ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    ///         // Check if this is your custom table source
+    ///         if scan.source.as_any().is::<MyCustomTableSource>() {
+    ///             // Create a custom execution plan for your table source
+    ///             let exec = MyCustomExec::new(
+    ///                 scan.table_name.clone(),
+    ///                 Arc::clone(scan.projected_schema.inner()),
+    ///             );
+    ///             Ok(Some(Arc::new(exec)))
+    ///         } else {
+    ///             // Return None to let other extension planners handle it
+    ///             Ok(None)
+    ///         }
+    ///     }
+    /// }
+    /// ```
+    ///
+    /// [`TableSource`]: datafusion_expr::TableSource
+    /// [`TableProvider`]: datafusion_catalog::TableProvider
+    async fn plan_table_scan(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        _scan: &TableScan,
+        _session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        Ok(None)
+    }
 }
 
 /// Default single node physical query planner that converts a
@@ -278,7 +352,8 @@ struct LogicalNode<'a> {
 
 impl DefaultPhysicalPlanner {
     /// Create a physical planner that uses `extension_planners` to
-    /// plan user-defined logical nodes [`LogicalPlan::Extension`].
+    /// plan user-defined logical nodes [`LogicalPlan::Extension`]
+    /// or user-defined table sources in [`LogicalPlan::TableScan`].
     /// The planner uses the first [`ExtensionPlanner`] to return a non-`None`
     /// plan.
     pub fn with_extension_planners(
@@ -287,6 +362,24 @@ impl DefaultPhysicalPlanner {
         Self { extension_planners }
     }
 
+    fn ensure_schema_matches(
+        &self,
+        logical_schema: &DFSchemaRef,
+        physical_plan: &Arc<dyn ExecutionPlan>,
+        context: &str,
+    ) -> Result<()> {
+        if !logical_schema.matches_arrow_schema(&physical_plan.schema()) {
+            return plan_err!(
+                "{} created an ExecutionPlan with mismatched schema. \
+                    LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}",
+                context,
+                logical_schema,
+                physical_plan.schema()
+            );
+        }
+        Ok(())
+    }
+
     /// Create a physical plan from a logical plan
     async fn create_initial_plan(
         &self,
@@ -455,25 +548,53 @@ impl DefaultPhysicalPlanner {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let exec_node: Arc<dyn ExecutionPlan> = match node {
             // Leaves (no children)
-            LogicalPlan::TableScan(TableScan {
-                source,
-                projection,
-                filters,
-                fetch,
-                ..
-            }) => {
-                let source = source_as_provider(source)?;
-                // Remove all qualifiers from the scan as the provider
-                // doesn't know (nor should care) how the relation was
-                // referred to in the query
-                let filters = unnormalize_cols(filters.iter().cloned());
-                let filters_vec = filters.into_iter().collect::<Vec<_>>();
-                let opts = ScanArgs::default()
-                    .with_projection(projection.as_deref())
-                    .with_filters(Some(&filters_vec))
-                    .with_limit(*fetch);
-                let res = source.scan_with_args(session_state, opts).await?;
-                Arc::clone(res.plan())
+            LogicalPlan::TableScan(scan) => {
+                let TableScan {
+                    source,
+                    projection,
+                    filters,
+                    fetch,
+                    projected_schema,
+                    ..
+                } = scan;
+
+                if let Ok(source) = source_as_provider(source) {
+                    // Remove all qualifiers from the scan as the provider
+                    // doesn't know (nor should care) how the relation was
+                    // referred to in the query
+                    let filters = unnormalize_cols(filters.iter().cloned());
+                    let filters_vec = filters.into_iter().collect::<Vec<_>>();
+                    let opts = ScanArgs::default()
+                        .with_projection(projection.as_deref())
+                        .with_filters(Some(&filters_vec))
+                        .with_limit(*fetch);
+                    let res = source.scan_with_args(session_state, 
opts).await?;
+                    Arc::clone(res.plan())
+                } else {
+                    let mut maybe_plan = None;
+                    for planner in &self.extension_planners {
+                        if maybe_plan.is_some() {
+                            break;
+                        }
+
+                        maybe_plan =
+                            planner.plan_table_scan(self, scan, 
session_state).await?;
+                    }
+
+                    let plan = match maybe_plan {
+                        Some(plan) => plan,
+                        None => {
+                            return plan_err!(
+                                "No installed planner was able to plan 
TableScan for custom TableSource: {:?}",
+                                scan.table_name
+                            );
+                        }
+                    };
+                    let context =
+                        format!("Extension planner for table scan {}", 
scan.table_name);
+                    self.ensure_schema_matches(projected_schema, &plan, 
&context)?;
+                    plan
+                }
             }
             LogicalPlan::Values(Values { values, schema }) => {
                 let exprs = values
@@ -1616,20 +1737,9 @@ impl DefaultPhysicalPlanner {
                     ),
                 }?;
 
-                // Ensure the ExecutionPlan's schema matches the
-                // declared logical schema to catch and warn about
-                // logic errors when creating user defined plans.
-                if !node.schema().matches_arrow_schema(&plan.schema()) {
-                    return plan_err!(
-                        "Extension planner for {:?} created an ExecutionPlan 
with mismatched schema. \
-                            LogicalPlan schema: {:?}, ExecutionPlan schema: 
{:?}",
-                        node,
-                        node.schema(),
-                        plan.schema()
-                    );
-                } else {
-                    plan
-                }
+                let context = format!("Extension planner for {node:?}");
+                self.ensure_schema_matches(node.schema(), &plan, &context)?;
+                plan
             }
 
             // Other
@@ -2889,7 +2999,9 @@ mod tests {
     use datafusion_execution::TaskContext;
     use datafusion_execution::runtime_env::RuntimeEnv;
     use datafusion_expr::builder::subquery_alias;
-    use datafusion_expr::{LogicalPlanBuilder, UserDefinedLogicalNodeCore, col, 
lit};
+    use datafusion_expr::{
+        LogicalPlanBuilder, TableSource, UserDefinedLogicalNodeCore, col, lit,
+    };
     use datafusion_functions_aggregate::count::count_all;
     use datafusion_functions_aggregate::expr_fn::sum;
     use datafusion_physical_expr::EquivalenceProperties;
@@ -4413,4 +4525,76 @@ digraph {
         assert_contains!(&err_str, "field nullability at index");
         assert_contains!(&err_str, "field metadata at index");
     }
+
+    #[derive(Debug)]
+    struct MockTableSource {
+        schema: SchemaRef,
+    }
+
+    impl TableSource for MockTableSource {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn schema(&self) -> SchemaRef {
+            Arc::clone(&self.schema)
+        }
+    }
+
+    struct MockTableScanExtensionPlanner;
+
+    #[async_trait]
+    impl ExtensionPlanner for MockTableScanExtensionPlanner {
+        async fn plan_extension(
+            &self,
+            _planner: &dyn PhysicalPlanner,
+            _node: &dyn UserDefinedLogicalNode,
+            _logical_inputs: &[&LogicalPlan],
+            _physical_inputs: &[Arc<dyn ExecutionPlan>],
+            _session_state: &SessionState,
+        ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+            Ok(None)
+        }
+
+        async fn plan_table_scan(
+            &self,
+            _planner: &dyn PhysicalPlanner,
+            scan: &TableScan,
+            _session_state: &SessionState,
+        ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+            if scan.source.as_any().is::<MockTableSource>() {
+                Ok(Some(Arc::new(EmptyExec::new(Arc::clone(
+                    scan.projected_schema.inner(),
+                )))))
+            } else {
+                Ok(None)
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn test_table_scan_extension_planner() {
+        let session_state = make_session_state();
+        let planner = Arc::new(MockTableScanExtensionPlanner);
+        let physical_planner =
+            DefaultPhysicalPlanner::with_extension_planners(vec![planner]);
+
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+
+        let table_source = Arc::new(MockTableSource {
+            schema: Arc::clone(&schema),
+        });
+        let logical_plan = LogicalPlanBuilder::scan("test", table_source, None)
+            .unwrap()
+            .build()
+            .unwrap();
+
+        let plan = physical_planner
+            .create_physical_plan(&logical_plan, &session_state)
+            .await
+            .unwrap();
+
+        assert_eq!(plan.schema(), schema);
+        assert!(plan.as_any().is::<EmptyExec>());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to