This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d937a0f Add `CREATE VIEW` (#2279)
19d937a0f is described below

commit 19d937a0f3c99be361df4bc912cc340a6196634a
Author: Matthew Turner <[email protected]>
AuthorDate: Wed May 11 12:03:27 2022 -0400

    Add `CREATE VIEW` (#2279)
    
    * Initial commit
    
    * First passing test
    
    * Add OR REPLACE and more tests
    
    * Update doc comment
    
    * More tests
    
    * Add CreateView to Ballista
    
    * Include Q15 for TPCH
    
    * Ignore q15
    
    * Delete view physical plan
---
 ballista/rust/core/proto/ballista.proto            |   7 +
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  33 +-
 datafusion-examples/examples/custom_datasource.rs  |   6 +-
 datafusion/core/src/datasource/datasource.rs       |   6 +-
 datafusion/core/src/datasource/empty.rs            |   6 +-
 datafusion/core/src/datasource/listing/table.rs    |   6 +-
 datafusion/core/src/datasource/memory.rs           |   6 +-
 datafusion/core/src/datasource/mod.rs              |   2 +
 datafusion/core/src/datasource/view.rs             | 364 +++++++++++++++++++++
 datafusion/core/src/execution/context.rs           |  36 +-
 datafusion/core/src/logical_plan/mod.rs            |   7 +-
 datafusion/core/src/logical_plan/plan.rs           |   4 +-
 .../core/src/optimizer/common_subexpr_eliminate.rs |   1 +
 datafusion/core/src/optimizer/filter_push_down.rs  |   6 +-
 .../core/src/optimizer/projection_push_down.rs     |   1 +
 datafusion/core/src/optimizer/utils.rs             |  13 +-
 datafusion/core/src/physical_plan/planner.rs       |   2 +-
 datafusion/core/src/sql/planner.rs                 |  19 +-
 datafusion/core/tests/custom_sources.rs            |   9 +-
 datafusion/core/tests/provider_filter_pushdown.rs  |   6 +-
 datafusion/core/tests/statistics.rs                |   6 +-
 datafusion/expr/src/logical_plan/mod.rs            |   8 +-
 datafusion/expr/src/logical_plan/plan.rs           |  29 +-
 23 files changed, 547 insertions(+), 36 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 1e7901403..e7821dc10 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -53,6 +53,7 @@ message LogicalPlanNode {
     UnionNode union = 19;
     CreateCatalogNode create_catalog = 20;
     SubqueryAliasNode subquery_alias = 21;
+    CreateViewNode create_view = 22;
   }
 }
 
@@ -171,6 +172,12 @@ message CreateCatalogNode {
   datafusion.DfSchema schema = 3;
 }
 
+message CreateViewNode {
+  string name = 1;
+  LogicalPlanNode input = 2;
+  bool or_replace = 3;
+}
+
 // a node containing data for defining values list. unlike in SQL where it's 
two dimensional, here
 // the list is flattened, and with the field n_cols it can be parsed and 
partitioned into rows
 message ValuesNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs 
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 5307aff65..e6d9e6289 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{
 };
 use datafusion::logical_plan::{
     source_as_provider, Column, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
-    CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, 
Repartition,
-    TableScan, Values,
+    CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, 
LogicalPlanBuilder,
+    Repartition, TableScan, Values,
 };
 use datafusion::prelude::SessionContext;
 
@@ -334,6 +334,19 @@ impl AsLogicalPlan for LogicalPlanNode {
                     if_not_exists: create_extern_table.if_not_exists,
                 }))
             }
+            LogicalPlanType::CreateView(create_view) => {
+                let plan = create_view
+                    .input.clone().ok_or_else(|| 
BallistaError::General(String::from(
+                        "Protobuf deserialization error, CreateViewNode has 
invalid LogicalPlan input.",
+                    )))?
+                    .try_into_logical_plan(ctx, extension_codec)?;
+
+                Ok(LogicalPlan::CreateView(CreateView {
+                    name: create_view.name.clone(),
+                    input: Arc::new(plan),
+                    or_replace: create_view.or_replace,
+                }))
+            }
             LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
                 let pb_schema = 
(create_catalog_schema.schema.clone()).ok_or_else(|| {
                     BallistaError::General(String::from(
@@ -851,6 +864,22 @@ impl AsLogicalPlan for LogicalPlanNode {
                     )),
                 })
             }
+            LogicalPlan::CreateView(CreateView {
+                name,
+                input,
+                or_replace,
+            }) => Ok(protobuf::LogicalPlanNode {
+                logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
+                    protobuf::CreateViewNode {
+                        name: name.clone(),
+                        input: 
Some(Box::new(LogicalPlanNode::try_from_logical_plan(
+                            input,
+                            extension_codec,
+                        )?)),
+                        or_replace: *or_replace,
+                    },
+                ))),
+            }),
             LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
                 schema_name,
                 if_not_exists,
diff --git a/datafusion-examples/examples/custom_datasource.rs 
b/datafusion-examples/examples/custom_datasource.rs
index d8a908986..a9a8ef7aa 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -20,7 +20,7 @@ use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::dataframe::DataFrame;
-use datafusion::datasource::TableProvider;
+use datafusion::datasource::{TableProvider, TableType};
 use datafusion::error::Result;
 use datafusion::execution::context::TaskContext;
 use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
@@ -165,6 +165,10 @@ impl TableProvider for CustomDataSource {
         ]))
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/datasource.rs 
b/datafusion/core/src/datasource/datasource.rs
index f4fdc975d..8ab254525 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -21,7 +21,7 @@ use std::any::Any;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use datafusion_expr::{TableProviderFilterPushDown, TableType};
+pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
 
 use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
@@ -39,9 +39,7 @@ pub trait TableProvider: Sync + Send {
     fn schema(&self) -> SchemaRef;
 
     /// Get the type of this table for metadata/catalog purposes.
-    fn table_type(&self) -> TableType {
-        TableType::Base
-    }
+    fn table_type(&self) -> TableType;
 
     /// Create an ExecutionPlan that will scan the table.
     /// The table provider will be usually responsible of grouping
diff --git a/datafusion/core/src/datasource/empty.rs 
b/datafusion/core/src/datasource/empty.rs
index 5622d15a0..837cd7704 100644
--- a/datafusion/core/src/datasource/empty.rs
+++ b/datafusion/core/src/datasource/empty.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use arrow::datatypes::*;
 use async_trait::async_trait;
 
-use crate::datasource::TableProvider;
+use crate::datasource::{TableProvider, TableType};
 use crate::error::Result;
 use crate::logical_plan::Expr;
 use crate::physical_plan::project_schema;
@@ -51,6 +51,10 @@ impl TableProvider for EmptyTable {
         self.schema.clone()
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 6881f674b..bde88b659 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -28,7 +28,7 @@ use crate::datasource::{
         avro::AvroFormat, csv::CsvFormat, json::JsonFormat, 
parquet::ParquetFormat,
         FileFormat,
     },
-    get_statistics_with_limit, TableProvider,
+    get_statistics_with_limit, TableProvider, TableType,
 };
 use crate::logical_expr::TableProviderFilterPushDown;
 use crate::{
@@ -298,6 +298,10 @@ impl TableProvider for ListingTable {
         Arc::clone(&self.table_schema)
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 72630b39c..adc26d2f4 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -27,7 +27,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
 
-use crate::datasource::TableProvider;
+use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::logical_plan::Expr;
@@ -127,6 +127,10 @@ impl TableProvider for MemTable {
         self.schema.clone()
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/mod.rs 
b/datafusion/core/src/datasource/mod.rs
index f4d059e3d..f3cc0a04e 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -24,12 +24,14 @@ pub mod file_format;
 pub mod listing;
 pub mod memory;
 pub mod object_store_registry;
+pub mod view;
 
 use futures::Stream;
 
 pub use self::datasource::TableProvider;
 use self::listing::PartitionedFile;
 pub use self::memory::MemTable;
+pub use self::view::ViewTable;
 use crate::arrow::datatypes::{Schema, SchemaRef};
 use crate::error::Result;
 pub use crate::logical_expr::TableType;
diff --git a/datafusion/core/src/datasource/view.rs 
b/datafusion/core/src/datasource/view.rs
new file mode 100644
index 000000000..2bb3b687b
--- /dev/null
+++ b/datafusion/core/src/datasource/view.rs
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! View data source which uses a LogicalPlan as it's input.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+
+use crate::{
+    error::Result,
+    execution::context::SessionContext,
+    logical_plan::{Expr, LogicalPlan},
+    physical_plan::ExecutionPlan,
+};
+
+use crate::datasource::{TableProvider, TableType};
+
+/// An implementation of `TableProvider` that uses another logical plan.
+pub struct ViewTable {
+    /// To create ExecutionPlan
+    context: SessionContext,
+    /// LogicalPlan of the view
+    logical_plan: LogicalPlan,
+    /// File fields + partition columns
+    table_schema: SchemaRef,
+}
+
+impl ViewTable {
+    /// Create new view that is executed at query runtime.
+    /// Takes a `LogicalPlan` as input.
+    pub fn try_new(context: SessionContext, logical_plan: LogicalPlan) -> 
Result<Self> {
+        let table_schema = logical_plan.schema().as_ref().to_owned().into();
+
+        let view = Self {
+            context,
+            logical_plan,
+            table_schema,
+        };
+
+        Ok(view)
+    }
+}
+
+#[async_trait]
+impl TableProvider for ViewTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        Arc::clone(&self.table_schema)
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::View
+    }
+
+    async fn scan(
+        &self,
+        _projection: &Option<Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        self.context.create_physical_plan(&self.logical_plan).await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{assert_batches_eq, execution::context::SessionConfig};
+
+    use super::*;
+
+    #[tokio::test]
+    async fn query_view() -> Result<()> {
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+            .await?
+            .collect()
+            .await?;
+
+        let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx.sql("SELECT * FROM information_schema.tables 
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+        assert_eq!(results[0].num_rows(), 1);
+
+        let results = session_ctx
+            .sql("SELECT * FROM xyz")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+---------+---------+---------+",
+            "| column1 | column2 | column3 |",
+            "+---------+---------+---------+",
+            "| 1       | 2       | 3       |",
+            "| 4       | 5       | 6       |",
+            "+---------+---------+---------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn query_view_with_projection() -> Result<()> {
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+            .await?
+            .collect()
+            .await?;
+
+        let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx.sql("SELECT * FROM information_schema.tables 
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+        assert_eq!(results[0].num_rows(), 1);
+
+        let results = session_ctx
+            .sql("SELECT column1 FROM xyz")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+---------+",
+            "| column1 |",
+            "+---------+",
+            "| 1       |",
+            "| 4       |",
+            "+---------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn query_view_with_filter() -> Result<()> {
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+            .await?
+            .collect()
+            .await?;
+
+        let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx.sql("SELECT * FROM information_schema.tables 
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+        assert_eq!(results[0].num_rows(), 1);
+
+        let results = session_ctx
+            .sql("SELECT column1 FROM xyz WHERE column2 = 5")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+---------+",
+            "| column1 |",
+            "+---------+",
+            "| 4       |",
+            "+---------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn query_join_views() -> Result<()> {
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+            .await?
+            .collect()
+            .await?;
+
+        let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let view_sql = "CREATE VIEW lmn AS SELECT column1, column3 FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx.sql("SELECT * FROM information_schema.tables 
WHERE table_type='VIEW' AND (table_name = 'xyz' OR table_name = 
'lmn')").await?.collect().await?;
+        assert_eq!(results[0].num_rows(), 2);
+
+        let results = session_ctx
+            .sql("SELECT * FROM xyz JOIN lmn USING (column1)")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+---------+---------+---------+",
+            "| column2 | column1 | column3 |",
+            "+---------+---------+---------+",
+            "| 2       | 1       | 3       |",
+            "| 5       | 4       | 6       |",
+            "+---------+---------+---------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn create_view_plan() -> Result<()> {
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+            .await?
+            .collect()
+            .await?;
+
+        let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx
+            .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            
"+---------------+--------------------------------------------------------+",
+            "| plan_type     | plan                                            
       |",
+            
"+---------------+--------------------------------------------------------+",
+            "| logical_plan  | CreateView: \"xyz\"                             
         |",
+            "|               |   Projection: #abc.column1, #abc.column2, 
#abc.column3 |",
+            "|               |     TableScan: abc projection=Some([0, 1, 2])   
       |",
+            "| physical_plan | EmptyExec: produce_one_row=false                
       |",
+            "|               |                                                 
       |",
+            
"+---------------+--------------------------------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        let results = session_ctx
+            .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 = 
5")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            
"+---------------+--------------------------------------------------------+",
+            "| plan_type     | plan                                            
       |",
+            
"+---------------+--------------------------------------------------------+",
+            "| logical_plan  | CreateView: \"xyz\"                             
         |",
+            "|               |   Projection: #abc.column1, #abc.column2, 
#abc.column3 |",
+            "|               |     Filter: #abc.column2 = Int64(5)             
       |",
+            "|               |       TableScan: abc projection=Some([0, 1, 2]) 
       |",
+            "| physical_plan | EmptyExec: produce_one_row=false                
       |",
+            "|               |                                                 
       |",
+            
"+---------------+--------------------------------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        let results = session_ctx
+            .sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc 
WHERE column2 = 5")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+---------------+----------------------------------------------+",
+            "| plan_type     | plan                                         |",
+            "+---------------+----------------------------------------------+",
+            "| logical_plan  | CreateView: \"xyz\"                            
|",
+            "|               |   Projection: #abc.column1, #abc.column2     |",
+            "|               |     Filter: #abc.column2 = Int64(5)          |",
+            "|               |       TableScan: abc projection=Some([0, 1]) |",
+            "| physical_plan | EmptyExec: produce_one_row=false             |",
+            "|               |                                              |",
+            "+---------------+----------------------------------------------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn create_or_replace_view() -> Result<()> {
+        let session_ctx = SessionContext::with_config(
+            SessionConfig::new().with_information_schema(true),
+        );
+
+        session_ctx
+            .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+            .await?
+            .collect()
+            .await?;
+
+        let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let view_sql = "CREATE OR REPLACE VIEW xyz AS SELECT column1 FROM abc";
+        session_ctx.sql(view_sql).await?.collect().await?;
+
+        let results = session_ctx.sql("SELECT * FROM information_schema.tables 
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+        assert_eq!(results[0].num_rows(), 1);
+
+        let results = session_ctx
+            .sql("SELECT * FROM xyz")
+            .await?
+            .collect()
+            .await?;
+
+        let expected = vec![
+            "+---------+",
+            "| column1 |",
+            "+---------+",
+            "| 1       |",
+            "| 4       |",
+            "+---------+",
+        ];
+
+        assert_batches_eq!(expected, &results);
+
+        Ok(())
+    }
+}
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index 9f7dcd27e..7ca9a1e4d 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -30,7 +30,7 @@ use crate::{
             parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
             FileFormat,
         },
-        MemTable,
+        MemTable, ViewTable,
     },
     logical_plan::{PlanType, ToStringifiedPlan},
     optimizer::eliminate_filter::EliminateFilter,
@@ -62,7 +62,7 @@ use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 use crate::logical_plan::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
-    DropTable, FileType, FunctionRegistry, LogicalPlan, LogicalPlanBuilder,
+    CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan, 
LogicalPlanBuilder,
     UNNAMED_TABLE,
 };
 use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
@@ -317,6 +317,38 @@ impl SessionContext {
                 }
             }
 
+            LogicalPlan::CreateView(CreateView {
+                name,
+                input,
+                or_replace,
+            }) => {
+                let view = self.table(name.as_str());
+
+                match (or_replace, view) {
+                    (true, Ok(_)) => {
+                        self.deregister_table(name.as_str())?;
+                        let plan = self.optimize(&input)?;
+                        let table =
+                            Arc::new(ViewTable::try_new(self.clone(), 
plan.clone())?);
+
+                        self.register_table(name.as_str(), table)?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (_, Err(_)) => {
+                        let plan = self.optimize(&input)?;
+                        let table =
+                            Arc::new(ViewTable::try_new(self.clone(), 
plan.clone())?);
+
+                        self.register_table(name.as_str(), table)?;
+                        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+                    }
+                    (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+                        "Table '{:?}' already exists",
+                        name
+                    ))),
+                }
+            }
+
             LogicalPlan::DropTable(DropTable {
                 name, if_exists, ..
             }) => {
diff --git a/datafusion/core/src/logical_plan/mod.rs 
b/datafusion/core/src/logical_plan/mod.rs
index 55295e22e..048b7b3ec 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -60,8 +60,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor, 
Recursion};
 pub use plan::{provider_as_source, source_as_provider};
 pub use plan::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
-    CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, 
Limit,
-    LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, 
StringifiedPlan,
-    Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, 
Values,
+    CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, 
JoinType,
+    Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
+    StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
+    UserDefinedLogicalNode, Values,
 };
 pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs 
b/datafusion/core/src/logical_plan/plan.rs
index 08d1fa120..e1adb4939 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -25,8 +25,8 @@ pub use crate::logical_expr::{
     logical_plan::{
         display::{GraphvizVisitor, IndentVisitor},
         Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
-        CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, 
Extension,
-        FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
+        CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, 
Explain,
+        Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, 
LogicalPlan,
         Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
         StringifiedPlan, Subquery, SubqueryAlias, TableScan, 
ToStringifiedPlan, Union,
         UserDefinedLogicalNode, Values, Window,
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs 
b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 967ef58b3..defb42289 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -224,6 +224,7 @@ fn optimize(plan: &LogicalPlan, execution_props: 
&ExecutionProps) -> Result<Logi
         | LogicalPlan::Explain { .. }
         | LogicalPlan::Analyze { .. }
         | LogicalPlan::CreateMemoryTable(_)
+        | LogicalPlan::CreateView(_)
         | LogicalPlan::CreateCatalogSchema(_)
         | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs 
b/datafusion/core/src/optimizer/filter_push_down.rs
index 0fd107b40..795f86392 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -560,7 +560,7 @@ mod tests {
     use std::sync::Arc;
 
     use super::*;
-    use crate::datasource::TableProvider;
+    use crate::datasource::{TableProvider, TableType};
     use crate::logical_plan::plan::provider_as_source;
     use crate::logical_plan::{
         and, col, lit, sum, union_with_alias, DFSchema, Expr, 
LogicalPlanBuilder,
@@ -1354,6 +1354,10 @@ mod tests {
             ]))
         }
 
+        fn table_type(&self) -> TableType {
+            TableType::Base
+        }
+
         async fn scan(
             &self,
             _: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs 
b/datafusion/core/src/optimizer/projection_push_down.rs
index 0979d8f5b..9cbec1e90 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -471,6 +471,7 @@ fn optimize_plan(
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable(_)
         | LogicalPlan::CreateMemoryTable(_)
+        | LogicalPlan::CreateView(_)
         | LogicalPlan::CreateCatalogSchema(_)
         | LogicalPlan::CreateCatalog(_)
         | LogicalPlan::DropTable(_)
diff --git a/datafusion/core/src/optimizer/utils.rs 
b/datafusion/core/src/optimizer/utils.rs
index 2c56b5f89..0e54fd93e 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -25,9 +25,9 @@ use datafusion_expr::logical_plan::{
 };
 
 use crate::logical_plan::{
-    and, build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr, 
ExprVisitable,
-    Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
-    Repartition, Union, Values,
+    and, build_join_schema, Column, CreateMemoryTable, CreateView, 
DFSchemaRef, Expr,
+    ExprVisitable, Limit, LogicalPlan, LogicalPlanBuilder, Operator, 
Partitioning,
+    Recursion, Repartition, Union, Values,
 };
 use crate::prelude::lit;
 use crate::scalar::ScalarValue;
@@ -258,6 +258,13 @@ pub fn from_plan(
             name: name.clone(),
             if_not_exists: *if_not_exists,
         })),
+        LogicalPlan::CreateView(CreateView {
+            name, or_replace, ..
+        }) => Ok(LogicalPlan::CreateView(CreateView {
+            input: Arc::new(inputs[0].clone()),
+            name: name.clone(),
+            or_replace: *or_replace,
+        })),
         LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
             node: e.node.from_template(expr, inputs),
         })),
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index f6b3842f2..47829ad79 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -880,7 +880,7 @@ impl DefaultPhysicalPlanner {
                         "Unsupported logical plan: CreateCatalog".to_string(),
                     ))
                 }
-                | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable 
(_) => {
+                | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable 
(_) | LogicalPlan::CreateView(_) => {
                     // Create a dummy exec.
                     Ok(Arc::new(EmptyExec::new(
                         false,
diff --git a/datafusion/core/src/sql/planner.rs 
b/datafusion/core/src/sql/planner.rs
index 143a0617a..4c6d26b66 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -31,8 +31,8 @@ use crate::logical_plan::{
     and, builder::expand_qualified_wildcard, builder::expand_wildcard, col, 
lit,
     normalize_col, normalize_col_with_schemas, union_with_alias, Column, 
CreateCatalog,
     CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
-    CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, FileType, 
LogicalPlan,
-    LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan,
+    CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr, 
FileType,
+    LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema, 
ToStringifiedPlan,
 };
 use crate::optimizer::utils::exprlist_to_columns;
 use crate::prelude::JoinType;
@@ -174,6 +174,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     if_not_exists,
                 }))
             }
+            Statement::CreateView {
+                or_replace,
+                name,
+                columns,
+                query,
+                with_options,
+                ..
+            } if columns.is_empty() && with_options.is_empty() => {
+                let plan = self.query_to_plan(*query, &mut HashMap::new())?;
+                Ok(LogicalPlan::CreateView(CreateView {
+                    name: name.to_string(),
+                    input: Arc::new(plan),
+                    or_replace,
+                }))
+            }
             Statement::CreateTable { .. } => 
Err(DataFusionError::NotImplemented(
                 "Only `CREATE TABLE table_name AS SELECT ...` statement is 
supported"
                     .to_string(),
diff --git a/datafusion/core/tests/custom_sources.rs 
b/datafusion/core/tests/custom_sources.rs
index 81e4706de..f1356f7d4 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -24,7 +24,10 @@ use datafusion::from_slice::FromSlice;
 use datafusion::physical_plan::empty::EmptyExec;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::scalar::ScalarValue;
-use datafusion::{datasource::TableProvider, physical_plan::collect};
+use datafusion::{
+    datasource::{TableProvider, TableType},
+    physical_plan::collect,
+};
 use datafusion::{error::Result, physical_plan::DisplayFormatType};
 
 use datafusion::execution::context::{SessionContext, TaskContext};
@@ -192,6 +195,10 @@ impl TableProvider for CustomTableProvider {
         TEST_CUSTOM_SCHEMA_REF!()
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs 
b/datafusion/core/tests/provider_filter_pushdown.rs
index 49cd70143..c8fe483ea 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -19,7 +19,7 @@ use arrow::array::{as_primitive_array, Int32Builder, 
UInt64Array};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion::datasource::datasource::TableProvider;
+use datafusion::datasource::datasource::{TableProvider, TableType};
 use datafusion::error::Result;
 use datafusion::execution::context::{SessionContext, TaskContext};
 use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
@@ -132,6 +132,10 @@ impl TableProvider for CustomProvider {
         self.zero_batch.schema()
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         _: &Option<Vec<usize>>,
diff --git a/datafusion/core/tests/statistics.rs 
b/datafusion/core/tests/statistics.rs
index 031506704..99b53a62d 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::{
-    datasource::TableProvider,
+    datasource::{TableProvider, TableType},
     error::Result,
     logical_plan::Expr,
     physical_plan::{
@@ -68,6 +68,10 @@ impl TableProvider for StatisticsValidation {
         Arc::clone(&self.schema)
     }
 
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
     async fn scan(
         &self,
         projection: &Option<Vec<usize>>,
diff --git a/datafusion/expr/src/logical_plan/mod.rs 
b/datafusion/expr/src/logical_plan/mod.rs
index a37729f7d..3681ff14e 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -21,10 +21,10 @@ mod plan;
 
 pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, 
CreateExternalTable,
-    CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, 
Extension, FileType,
-    Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, 
PlanType,
-    PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery, 
SubqueryAlias,
-    TableScan, ToStringifiedPlan, Union, Values, Window,
+    CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation, 
Explain,
+    Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit, 
LogicalPlan,
+    Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort, 
StringifiedPlan,
+    Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values, 
Window,
 };
 
 pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 579898dbe..ab96dbe73 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -77,6 +77,8 @@ pub enum LogicalPlan {
     CreateExternalTable(CreateExternalTable),
     /// Creates an in memory table.
     CreateMemoryTable(CreateMemoryTable),
+    /// Creates a new view.
+    CreateView(CreateView),
     /// Creates a new catalog schema.
     CreateCatalogSchema(CreateCatalogSchema),
     /// Creates a new catalog (aka "Database").
@@ -124,9 +126,8 @@ impl LogicalPlan {
             LogicalPlan::Analyze(analyze) => &analyze.schema,
             LogicalPlan::Extension(extension) => extension.node.schema(),
             LogicalPlan::Union(Union { schema, .. }) => schema,
-            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => 
{
-                input.schema()
-            }
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+            | LogicalPlan::CreateView(CreateView { input, .. }) => 
input.schema(),
             LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. 
}) => {
                 schema
             }
@@ -185,6 +186,7 @@ impl LogicalPlan {
             | LogicalPlan::Repartition(Repartition { input, .. })
             | LogicalPlan::Sort(Sort { input, .. })
             | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+            | LogicalPlan::CreateView(CreateView { input, .. })
             | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
             LogicalPlan::DropTable(_) => vec![],
         }
@@ -235,6 +237,7 @@ impl LogicalPlan {
             | LogicalPlan::Limit(_)
             | LogicalPlan::CreateExternalTable(_)
             | LogicalPlan::CreateMemoryTable(_)
+            | LogicalPlan::CreateView(_)
             | LogicalPlan::CreateCatalogSchema(_)
             | LogicalPlan::CreateCatalog(_)
             | LogicalPlan::DropTable(_)
@@ -266,7 +269,8 @@ impl LogicalPlan {
             LogicalPlan::Union(Union { inputs, .. }) => 
inputs.iter().collect(),
             LogicalPlan::Explain(explain) => vec![&explain.plan],
             LogicalPlan::Analyze(analyze) => vec![&analyze.input],
-            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => 
{
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+            | LogicalPlan::CreateView(CreateView { input, .. }) => {
                 vec![input]
             }
             // plans without inputs
@@ -405,7 +409,8 @@ impl LogicalPlan {
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
                 input.accept(visitor)?
             }
-            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => 
{
+            LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+            | LogicalPlan::CreateView(CreateView { input, .. }) => {
                 input.accept(visitor)?
             }
             LogicalPlan::Extension(extension) => {
@@ -793,6 +798,9 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "CreateMemoryTable: {:?}", name)
                     }
+                    LogicalPlan::CreateView(CreateView { name, .. }) => {
+                        write!(f, "CreateView: {:?}", name)
+                    }
                     LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
                         schema_name,
                         ..
@@ -1036,6 +1044,17 @@ pub struct CreateMemoryTable {
     pub if_not_exists: bool,
 }
 
+/// Creates a view.
+#[derive(Clone)]
+pub struct CreateView {
+    /// The table name
+    pub name: String,
+    /// The logical plan
+    pub input: Arc<LogicalPlan>,
+    /// Option to not error if table already exists
+    pub or_replace: bool,
+}
+
 /// Types of files to parse as DataFrames
 #[derive(Debug, Clone, Copy, PartialEq)]
 pub enum FileType {

Reply via email to