This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f3722c0af8 Add `SQLOptions` for controlling allowed SQL statements,
update docs (#7333)
f3722c0af8 is described below
commit f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Aug 22 08:43:47 2023 -0400
Add `SQLOptions` for controlling allowed SQL statements, update docs (#7333)
* Add `SQLOptions` for controlling allowed SQL statements, update docs
* fix docs
---
datafusion/core/src/execution/context.rs | 263 ++++++++++++++++++++++++-------
datafusion/core/src/prelude.rs | 2 +-
datafusion/core/tests/sql/mod.rs | 1 +
datafusion/core/tests/sql/sql_api.rs | 116 ++++++++++++++
4 files changed, 326 insertions(+), 56 deletions(-)
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index c0a0134fed..c97f770ab3 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -28,7 +28,11 @@ use crate::{
optimizer::optimizer::Optimizer,
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
};
-use datafusion_common::{alias::AliasGenerator, not_impl_err, plan_err};
+use datafusion_common::{
+ alias::AliasGenerator,
+ not_impl_err, plan_err,
+ tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
+};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
@@ -163,12 +167,14 @@ where
/// * Register a custom data source that can be referenced from a SQL query.
/// * Execution a SQL query
///
+/// # Example: DataFrame API
+///
/// The following example demonstrates how to use the context to execute a
query against a CSV
/// data source using the DataFrame API:
///
/// ```
/// use datafusion::prelude::*;
-/// # use datafusion::error::Result;
+/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
@@ -176,22 +182,49 @@ where
/// let df = df.filter(col("a").lt_eq(col("b")))?
/// .aggregate(vec![col("a")], vec![min(col("b"))])?
/// .limit(0, Some(100))?;
-/// let results = df.collect();
+/// let results = df
+/// .collect()
+/// .await?;
+/// assert_batches_eq!(
+/// &[
+/// "+---+----------------+",
+/// "| a | MIN(?table?.b) |",
+/// "+---+----------------+",
+/// "| 1 | 2 |",
+/// "+---+----------------+",
+/// ],
+/// &results
+/// );
/// # Ok(())
/// # }
/// ```
///
+/// # Example: SQL API
+///
/// The following example demonstrates how to execute the same query using SQL:
///
/// ```
/// use datafusion::prelude::*;
-///
-/// # use datafusion::error::Result;
+/// # use datafusion::{error::Result, assert_batches_eq};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let mut ctx = SessionContext::new();
/// ctx.register_csv("example", "tests/data/example.csv",
CsvReadOptions::new()).await?;
-/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT
100").await?;
+/// let results = ctx
+/// .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
+/// .await?
+/// .collect()
+/// .await?;
+/// assert_batches_eq!(
+/// &[
+/// "+---+----------------+",
+/// "| a | MIN(example.b) |",
+/// "+---+----------------+",
+/// "| 1 | 2 |",
+/// "+---+----------------+",
+/// ],
+/// &results
+/// );
/// # Ok(())
/// # }
/// ```
@@ -342,22 +375,82 @@ impl SessionContext {
self.state.read().config.clone()
}
- /// Creates a [`DataFrame`] that will execute a SQL query.
+ /// Creates a [`DataFrame`] from SQL query text.
///
/// Note: This API implements DDL statements such as `CREATE TABLE` and
/// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory
- /// default implementations.
+ /// default implementations. See [`Self::sql_with_options`].
+ ///
+ /// # Example: Running SQL queries
+ ///
+ /// See the example on [`Self`]
///
- /// If this is not desirable, consider using
[`SessionState::create_logical_plan()`] which
- /// does not mutate the state based on such statements.
+ /// # Example: Creating a Table with SQL
+ ///
+ /// ```
+ /// use datafusion::prelude::*;
+ /// # use datafusion::{error::Result, assert_batches_eq};
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = SessionContext::new();
+ /// ctx
+ /// .sql("CREATE TABLE foo (x INTEGER)")
+ /// .await?
+ /// .collect()
+ /// .await?;
+ /// assert!(ctx.table_exist("foo").unwrap());
+ /// # Ok(())
+ /// # }
+ /// ```
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
- // create a query planner
+ self.sql_with_options(sql, SQLOptions::new()).await
+ }
+
+ /// Creates a [`DataFrame`] from SQL query text, first validating
+ /// that the queries are allowed by `options`
+ ///
+ /// # Example: Preventing Creating a Table with SQL
+ ///
+ /// If you want to avoid creating tables, or modifying data or the
+ /// session, set [`SQLOptions`] appropriately:
+ ///
+ /// ```
+ /// use datafusion::prelude::*;
+ /// # use datafusion::{error::Result};
+ /// # use datafusion::physical_plan::collect;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = SessionContext::new();
+ /// let options = SQLOptions::new()
+ /// .with_allow_ddl(false);
+ /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
+ /// .await
+ /// .unwrap_err();
+ /// assert_eq!(
+ /// err.to_string(),
+ /// "Error during planning: DDL not supported: CreateMemoryTable"
+ /// );
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub async fn sql_with_options(
+ &self,
+ sql: &str,
+ options: SQLOptions,
+ ) -> Result<DataFrame> {
let plan = self.state().create_logical_plan(sql).await?;
+ options.verify_plan(&plan)?;
self.execute_logical_plan(plan).await
}
- /// Execute the [`LogicalPlan`], return a [`DataFrame`]
+ /// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
+ /// is not featured limited (so all SQL such as `CREATE TABLE` and
+ /// `COPY` will be run).
+ ///
+ /// If you wish to limit the type of plan that can be run from
+ /// SQL, see [`Self::sql_with_options`] and
+ /// [`SQLOptions::verify_plan`].
pub async fn execute_logical_plan(&self, plan: LogicalPlan) ->
Result<DataFrame> {
match plan {
LogicalPlan::Ddl(ddl) => match ddl {
@@ -1304,7 +1397,7 @@ impl FunctionRegistry for SessionContext {
/// A planner used to add extensions to DataFusion logical and physical plans.
#[async_trait]
pub trait QueryPlanner {
- /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
+ /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for
execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
@@ -1317,7 +1410,7 @@ struct DefaultQueryPlanner {}
#[async_trait]
impl QueryPlanner for DefaultQueryPlanner {
- /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
+ /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for
execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
@@ -1628,7 +1721,8 @@ impl SessionState {
&mut self.table_factories
}
- /// Convert a SQL string into an AST Statement
+ /// Parse an SQL string into an DataFusion specific AST
+ /// [`Statement`]. See [`SessionContext::sql`] for running queries.
pub fn sql_to_statement(
&self,
sql: &str,
@@ -1787,9 +1881,15 @@ impl SessionState {
query.statement_to_plan(statement)
}
- /// Creates a [`LogicalPlan`] from the provided SQL string
+ /// Creates a [`LogicalPlan`] from the provided SQL string. This
+ /// interface will plan any SQL DataFusion supports, including DML
+ /// like `CREATE TABLE`, and `COPY` (which can write to local
+ /// files.
///
- /// See [`SessionContext::sql`] for a higher-level interface that also
handles DDL
+ /// See [`SessionContext::sql`] and
+ /// [`SessionContext::sql_with_options`] for a higher-level
+ /// interface that handles DDL and verification of allowed
+ /// statements.
pub async fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
let dialect = self.config.options().sql_parser.dialect.as_str();
let statement = self.sql_to_statement(sql, dialect)?;
@@ -1870,7 +1970,11 @@ impl SessionState {
/// Creates a physical plan from a logical plan.
///
- /// Note: this first calls [`Self::optimize`] on the provided plan
+ /// Note: this first calls [`Self::optimize`] on the provided
+ /// plan.
+ ///
+ /// This function will error for [`LogicalPlan`]s such as catalog
+ /// DDL `CREATE TABLE` must be handled by another layer.
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
@@ -2095,6 +2199,92 @@ impl SerializerRegistry for EmptySerializerRegistry {
}
}
+/// Describes which SQL statements can be run.
+///
+/// See [`SessionContext::sql_with_options`] for more details.
+#[derive(Clone, Debug, Copy)]
+pub struct SQLOptions {
+ /// See [`Self::with_allow_ddl`]
+ allow_ddl: bool,
+ /// See [`Self::with_allow_dml`]
+ allow_dml: bool,
+ /// See [`Self::with_allow_statements`]
+ allow_statements: bool,
+}
+
+impl Default for SQLOptions {
+ fn default() -> Self {
+ Self {
+ allow_ddl: true,
+ allow_dml: true,
+ allow_statements: true,
+ }
+ }
+}
+
+impl SQLOptions {
+ /// Create a new `SQLOptions` with default values
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Should DML data modification commands (e.g. `INSERT and COPY`) be
run? Defaults to `true`.
+ pub fn with_allow_ddl(mut self, allow: bool) -> Self {
+ self.allow_ddl = allow;
+ self
+ }
+
+ /// Should DML data modification commands (e.g. `INSERT and COPY`) be run?
Defaults to `true`
+ pub fn with_allow_dml(mut self, allow: bool) -> Self {
+ self.allow_dml = allow;
+ self
+ }
+
+ /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION`
...`) be run?. Defaults to `true`
+ pub fn with_allow_statements(mut self, allow: bool) -> Self {
+ self.allow_statements = allow;
+ self
+ }
+
+ /// Return an error if the [`LogicalPlan`] has any nodes that are
+ /// incompatible with this [`SQLOptions`].
+ pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> {
+ plan.visit(&mut BadPlanVisitor::new(self))?;
+ Ok(())
+ }
+}
+
+struct BadPlanVisitor<'a> {
+ options: &'a SQLOptions,
+}
+impl<'a> BadPlanVisitor<'a> {
+ fn new(options: &'a SQLOptions) -> Self {
+ Self { options }
+ }
+}
+
+impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> {
+ type N = LogicalPlan;
+
+ fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
+ match node {
+ LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => {
+ plan_err!("DDL not supported: {}", ddl.name())
+ }
+ LogicalPlan::Dml(dml) if !self.options.allow_dml => {
+ plan_err!("DML not supported: {}", dml.op)
+ }
+ LogicalPlan::Copy(_) if !self.options.allow_dml => {
+ plan_err!("DML not supported: COPY")
+ }
+ LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
+ plan_err!("Statement not supported: {}", stmt.name())
+ }
+ _ => Ok(VisitRecursion::Continue),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -2646,43 +2836,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn unsupported_sql_returns_error() -> Result<()> {
- let ctx = SessionContext::new();
- ctx.register_table("test", test::table_with_sequence(1, 1).unwrap())
- .unwrap();
- let state = ctx.state();
-
- // create view
- let sql = "create view test_view as select * from test";
- let plan = state.create_logical_plan(sql).await;
- let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
- assert!(physical_plan.is_err());
- assert_eq!(
- format!("{}", physical_plan.unwrap_err()),
- "This feature is not implemented: Unsupported logical plan:
CreateView"
- );
- // // drop view
- let sql = "drop view test_view";
- let plan = state.create_logical_plan(sql).await;
- let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
- assert!(physical_plan.is_err());
- assert_eq!(
- format!("{}", physical_plan.unwrap_err()),
- "This feature is not implemented: Unsupported logical plan:
DropView"
- );
- // // drop table
- let sql = "drop table test";
- let plan = state.create_logical_plan(sql).await;
- let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
- assert!(physical_plan.is_err());
- assert_eq!(
- format!("{}", physical_plan.unwrap_err()),
- "This feature is not implemented: Unsupported logical plan:
DropTable"
- );
- Ok(())
- }
-
struct MyPhysicalPlanner {}
#[async_trait]
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index d01d9c2390..3782feca19 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -26,7 +26,7 @@
//! ```
pub use crate::dataframe::DataFrame;
-pub use crate::execution::context::{SessionConfig, SessionContext};
+pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext};
pub use crate::execution::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
};
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index c1adcf9d0a..35423234db 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -96,6 +96,7 @@ pub mod projection;
pub mod references;
pub mod repartition;
pub mod select;
+mod sql_api;
pub mod subqueries;
pub mod timestamp;
pub mod udf;
diff --git a/datafusion/core/tests/sql/sql_api.rs
b/datafusion/core/tests/sql/sql_api.rs
new file mode 100644
index 0000000000..4f249a8656
--- /dev/null
+++ b/datafusion/core/tests/sql/sql_api.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::prelude::*;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn unsupported_ddl_returns_error() {
+ // Verify SessionContext::with_sql_options errors appropriately
+ let ctx = SessionContext::new();
+ ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+ // disallow ddl
+ let options = SQLOptions::new().with_allow_ddl(false);
+
+ let sql = "create view test_view as select * from test";
+ let df = ctx.sql_with_options(sql, options).await;
+ assert_eq!(
+ df.unwrap_err().to_string(),
+ "Error during planning: DDL not supported: CreateView"
+ );
+
+ // allow ddl
+ let options = options.with_allow_ddl(true);
+ ctx.sql_with_options(sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn unsupported_dml_returns_error() {
+ let ctx = SessionContext::new();
+ ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+ let options = SQLOptions::new().with_allow_dml(false);
+
+ let sql = "insert into test values (1)";
+ let df = ctx.sql_with_options(sql, options).await;
+ assert_eq!(
+ df.unwrap_err().to_string(),
+ "Error during planning: DML not supported: Insert Into"
+ );
+
+ let options = options.with_allow_dml(true);
+ ctx.sql_with_options(sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn unsupported_copy_returns_error() {
+ let tmpdir = TempDir::new().unwrap();
+ let tmpfile = tmpdir.path().join("foo.parquet");
+
+ let ctx = SessionContext::new();
+ ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+ let options = SQLOptions::new().with_allow_dml(false);
+
+ let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy());
+ let df = ctx.sql_with_options(&sql, options).await;
+ assert_eq!(
+ df.unwrap_err().to_string(),
+ "Error during planning: DML not supported: COPY"
+ );
+
+ let options = options.with_allow_dml(true);
+ ctx.sql_with_options(&sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn unsupported_statement_returns_error() {
+ let ctx = SessionContext::new();
+ ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+ let options = SQLOptions::new().with_allow_statements(false);
+
+ let sql = "set datafusion.execution.batch_size = 5";
+ let df = ctx.sql_with_options(sql, options).await;
+ assert_eq!(
+ df.unwrap_err().to_string(),
+ "Error during planning: Statement not supported: SetVariable"
+ );
+
+ let options = options.with_allow_statements(true);
+ ctx.sql_with_options(sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn ddl_can_not_be_planned_by_session_state() {
+ let ctx = SessionContext::new();
+
+ // make a table via SQL
+ ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+ let state = ctx.state();
+
+ // can not create a logical plan for catalog DDL
+ let sql = "drop table test";
+ let plan = state.create_logical_plan(sql).await.unwrap();
+ let physical_plan = state.create_physical_plan(&plan).await;
+ assert_eq!(
+ physical_plan.unwrap_err().to_string(),
+ "This feature is not implemented: Unsupported logical plan: DropTable"
+ );
+}