This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new cb84504fe `LogicalPlanBuilder` now uses `TableSource` instead of 
`TableProvider` (#2569)
cb84504fe is described below

commit cb84504fed4e613c9ed18c4e2a2022c701add2d9
Author: Andy Grove <[email protected]>
AuthorDate: Sat May 21 05:15:43 2022 -0600

    `LogicalPlanBuilder` now uses `TableSource` instead of `TableProvider` 
(#2569)
    
    * add scan_empty method to tests
    
    * update tests to use new scan_empty test method
    
    * remove LogicalPlanBuilder::scan_empty
    
    * LogicalPlanBuilder now uses TableSource instead of TableProvider
---
 ballista/rust/core/src/serde/logical_plan/mod.rs  |  8 ++++----
 datafusion-examples/examples/custom_datasource.rs | 16 ++++++++++------
 datafusion/core/src/execution/context.rs          | 15 +++++++++------
 datafusion/core/src/logical_plan/builder.rs       | 16 ++++++++--------
 datafusion/core/src/sql/planner.rs                | 16 ++++++++++------
 datafusion/core/src/test_util.rs                  |  8 ++++++--
 datafusion/core/tests/parquet_pruning.rs          | 17 +++++++++++------
 datafusion/core/tests/sql/projection.rs           |  9 +++++----
 8 files changed, 63 insertions(+), 42 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 070dad98a..f088f2f1f 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{
     Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, 
Window,
 };
 use datafusion::logical_plan::{
-    source_as_provider, Column, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
-    CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, 
LogicalPlanBuilder,
-    Offset, Repartition, TableScan, Values,
+    provider_as_source, source_as_provider, Column, CreateCatalog, 
CreateCatalogSchema,
+    CreateExternalTable, CreateView, CrossJoin, Expr, JoinConstraint, Limit, 
LogicalPlan,
+    LogicalPlanBuilder, Offset, Repartition, TableScan, Values,
 };
 use datafusion::prelude::SessionContext;
 
@@ -252,7 +252,7 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                 LogicalPlanBuilder::scan_with_filters(
                     &scan.table_name,
-                    Arc::new(provider),
+                    provider_as_source(Arc::new(provider)),
                     projection,
                     filters,
                 )?
diff --git a/datafusion-examples/examples/custom_datasource.rs 
b/datafusion-examples/examples/custom_datasource.rs
index a9a8ef7aa..a814e585e 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -23,7 +23,7 @@ use datafusion::dataframe::DataFrame;
 use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result;
 use datafusion::execution::context::TaskContext;
-use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
+use datafusion::logical_plan::{provider_as_source, Expr, LogicalPlanBuilder};
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::{
@@ -60,11 +60,15 @@ async fn search_accounts(
     let ctx = SessionContext::new();
 
     // create logical plan composed of a single TableScan
-    let logical_plan =
-        LogicalPlanBuilder::scan_with_filters("accounts", Arc::new(db), None, 
vec![])
-            .unwrap()
-            .build()
-            .unwrap();
+    let logical_plan = LogicalPlanBuilder::scan_with_filters(
+        "accounts",
+        provider_as_source(Arc::new(db)),
+        None,
+        vec![],
+    )
+    .unwrap()
+    .build()
+    .unwrap();
 
     let mut dataframe = DataFrame::new(ctx.state, &logical_plan)
         .select_columns(&["id", "bank_account"])?;
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index ca3bca61d..629adf137 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -61,9 +61,9 @@ use crate::datasource::listing::ListingTableConfig;
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::{
-    CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
-    CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, 
LogicalPlanBuilder,
-    UNNAMED_TABLE,
+    provider_as_source, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
+    CreateMemoryTable, CreateView, DropTable, FileType, FunctionRegistry, 
LogicalPlan,
+    LogicalPlanBuilder, UNNAMED_TABLE,
 };
 use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
 use crate::optimizer::filter_push_down::FilterPushDown;
@@ -586,7 +586,9 @@ impl SessionContext {
             .with_schema(resolved_schema);
         let provider = ListingTable::try_new(config)?;
 
-        let plan = LogicalPlanBuilder::scan(path, Arc::new(provider), 
None)?.build()?;
+        let plan =
+            LogicalPlanBuilder::scan(path, 
provider_as_source(Arc::new(provider)), None)?
+                .build()?;
         Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
     }
 
@@ -620,7 +622,8 @@ impl SessionContext {
     pub fn read_table(&self, provider: Arc<dyn TableProvider>) -> 
Result<Arc<DataFrame>> {
         Ok(Arc::new(DataFrame::new(
             self.state.clone(),
-            &LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?.build()?,
+            &LogicalPlanBuilder::scan(UNNAMED_TABLE, 
provider_as_source(provider), None)?
+                .build()?,
         )))
     }
 
@@ -817,7 +820,7 @@ impl SessionContext {
             Some(ref provider) => {
                 let plan = LogicalPlanBuilder::scan(
                     table_ref.table(),
-                    Arc::clone(provider),
+                    provider_as_source(Arc::clone(provider)),
                     None,
                 )?
                 .build()?;
diff --git a/datafusion/core/src/logical_plan/builder.rs 
b/datafusion/core/src/logical_plan/builder.rs
index f8b1fefdd..cad1b0a6a 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -17,7 +17,6 @@
 
 //! This module provides a builder for creating LogicalPlans
 
-use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::logical_expr::ExprSchemable;
 use crate::logical_plan::plan::{
@@ -41,11 +40,12 @@ use std::{
 use super::{Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
 use crate::logical_plan::{
     columnize_expr, exprlist_to_fields, normalize_col, normalize_cols,
-    provider_as_source, rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, 
DFSchema,
-    DFSchemaRef, Limit, Offset, Partitioning, Repartition, Values,
+    rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, 
DFSchemaRef, Limit,
+    Offset, Partitioning, Repartition, Values,
 };
 
 use datafusion_common::ToDFSchema;
+use datafusion_expr::TableSource;
 
 /// Default table name for unnamed table
 pub const UNNAMED_TABLE: &str = "?table?";
@@ -191,16 +191,16 @@ impl LogicalPlanBuilder {
     /// Convert a table provider into a builder with a TableScan
     pub fn scan(
         table_name: impl Into<String>,
-        provider: Arc<dyn TableProvider>,
+        table_source: Arc<dyn TableSource>,
         projection: Option<Vec<usize>>,
     ) -> Result<Self> {
-        Self::scan_with_filters(table_name, provider, projection, vec![])
+        Self::scan_with_filters(table_name, table_source, projection, vec![])
     }
 
     /// Convert a table provider into a builder with a TableScan
     pub fn scan_with_filters(
         table_name: impl Into<String>,
-        provider: Arc<dyn TableProvider>,
+        table_source: Arc<dyn TableSource>,
         projection: Option<Vec<usize>>,
         filters: Vec<Expr>,
     ) -> Result<Self> {
@@ -212,7 +212,7 @@ impl LogicalPlanBuilder {
             ));
         }
 
-        let schema = provider.schema();
+        let schema = table_source.schema();
 
         let projected_schema = projection
             .as_ref()
@@ -232,7 +232,7 @@ impl LogicalPlanBuilder {
 
         let table_scan = LogicalPlan::TableScan(TableScan {
             table_name,
-            source: provider_as_source(provider),
+            source: table_source,
             projected_schema: Arc::new(projected_schema),
             projection,
             filters,
diff --git a/datafusion/core/src/sql/planner.rs 
b/datafusion/core/src/sql/planner.rs
index 9f96097ea..5113c9f5b 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -28,10 +28,11 @@ use crate::datasource::TableProvider;
 use crate::logical_plan::window_frames::{WindowFrame, WindowFrameUnits};
 use crate::logical_plan::Expr::Alias;
 use crate::logical_plan::{
-    and, col, lit, normalize_col, normalize_col_with_schemas, Column, 
CreateCatalog,
-    CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
-    CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, 
FileType,
-    LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, 
ToStringifiedPlan,
+    and, col, lit, normalize_col, normalize_col_with_schemas, 
provider_as_source,
+    union_with_alias, Column, CreateCatalog, CreateCatalogSchema,
+    CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, 
CreateView,
+    DFSchema, DFSchemaRef, DropTable, Expr, FileType, LogicalPlan, 
LogicalPlanBuilder,
+    Operator, PlanType, ToDFSchema, ToStringifiedPlan,
 };
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
@@ -714,8 +715,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                             _ => Ok(cte_plan.clone()),
                         },
                         (_, Ok(provider)) => {
-                            let scan =
-                                LogicalPlanBuilder::scan(&table_name, 
provider, None);
+                            let scan = LogicalPlanBuilder::scan(
+                                &table_name,
+                                provider_as_source(provider),
+                                None,
+                            );
                             let scan = match table_alias.as_ref() {
                                 Some(ref name) => 
scan?.alias(name.to_owned().as_str()),
                                 _ => scan,
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 3e174e9dc..1a6a5028e 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -21,7 +21,7 @@ use std::collections::BTreeMap;
 use std::{env, error::Error, path::PathBuf, sync::Arc};
 
 use crate::datasource::empty::EmptyTable;
-use crate::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
+use crate::logical_plan::{provider_as_source, LogicalPlanBuilder, 
UNNAMED_TABLE};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::DataFusionError;
 
@@ -243,7 +243,11 @@ pub fn scan_empty(
 ) -> Result<LogicalPlanBuilder, DataFusionError> {
     let table_schema = Arc::new(table_schema.clone());
     let provider = Arc::new(EmptyTable::new(table_schema));
-    LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), provider, 
projection)
+    LogicalPlanBuilder::scan(
+        name.unwrap_or(UNNAMED_TABLE),
+        provider_as_source(provider),
+        projection,
+    )
 }
 
 /// Get the schema for the aggregate_test_* csv files
diff --git a/datafusion/core/tests/parquet_pruning.rs 
b/datafusion/core/tests/parquet_pruning.rs
index 0d580f2d2..7e7caa959 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -30,6 +30,7 @@ use arrow::{
     util::pretty::pretty_format_batches,
 };
 use chrono::{Datelike, Duration};
+use datafusion::logical_plan::provider_as_source;
 use datafusion::{
     datasource::TableProvider,
     logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder},
@@ -544,12 +545,16 @@ impl ContextWithParquet {
     /// the number of output rows and normalized execution metrics
     async fn query_with_expr(&mut self, expr: Expr) -> TestOutput {
         let sql = format!("EXPR only: {:?}", expr);
-        let logical_plan = LogicalPlanBuilder::scan("t", 
self.provider.clone(), None)
-            .unwrap()
-            .filter(expr)
-            .unwrap()
-            .build()
-            .unwrap();
+        let logical_plan = LogicalPlanBuilder::scan(
+            "t",
+            provider_as_source(self.provider.clone()),
+            None,
+        )
+        .unwrap()
+        .filter(expr)
+        .unwrap()
+        .build()
+        .unwrap();
         self.run_test(logical_plan, sql).await
     }
 
diff --git a/datafusion/core/tests/sql/projection.rs 
b/datafusion/core/tests/sql/projection.rs
index e1b1742bf..c74445bfd 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::logical_plan::{LogicalPlanBuilder, UNNAMED_TABLE};
+use datafusion::logical_plan::{provider_as_source, LogicalPlanBuilder, 
UNNAMED_TABLE};
 use datafusion::test_util::scan_empty;
 use tempfile::TempDir;
 
@@ -239,9 +239,10 @@ async fn projection_on_memory_scan() -> Result<()> {
     )?]];
 
     let provider = Arc::new(MemTable::try_new(schema, partitions)?);
-    let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?
-        .project(vec![col("b")])?
-        .build()?;
+    let plan =
+        LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(provider), 
None)?
+            .project(vec![col("b")])?
+            .build()?;
     assert_fields_eq(&plan, vec!["b"]);
 
     let ctx = SessionContext::new();

Reply via email to