This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 861a2364bd feat: add CliSessionContext trait for cli (#10890)
861a2364bd is described below

commit 861a2364bdf04854482384c29d9b64962da377fe
Author: Trent Hauck <[email protected]>
AuthorDate: Mon Jun 17 10:16:20 2024 -0700

    feat: add CliSessionContext trait for cli (#10890)
    
    use CliSessionContext trait for cli
---
 datafusion-cli/examples/cli-session-context.rs | 97 +++++++++++++++++++++++++
 datafusion-cli/src/cli_context.rs              | 98 ++++++++++++++++++++++++++
 datafusion-cli/src/command.rs                  |  4 +-
 datafusion-cli/src/exec.rs                     | 28 ++++----
 datafusion-cli/src/lib.rs                      |  1 +
 datafusion-cli/src/object_storage.rs           | 55 ++-------------
 6 files changed, 220 insertions(+), 63 deletions(-)

diff --git a/datafusion-cli/examples/cli-session-context.rs 
b/datafusion-cli/examples/cli-session-context.rs
new file mode 100644
index 0000000000..8da52ed84a
--- /dev/null
+++ b/datafusion-cli/examples/cli-session-context.rs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shows an example of a custom session context that unions the input plan 
with itself.
+//! To run this example, use `cargo run --example cli-session-context` from 
within the `datafusion-cli` directory.
+
+use std::sync::Arc;
+
+use datafusion::{
+    dataframe::DataFrame,
+    error::DataFusionError,
+    execution::{context::SessionState, TaskContext},
+    logical_expr::{LogicalPlan, LogicalPlanBuilder},
+    prelude::SessionContext,
+};
+use datafusion_cli::{
+    cli_context::CliSessionContext, exec::exec_from_repl, 
print_options::PrintOptions,
+};
+use object_store::ObjectStore;
+
+/// This is a toy example of a custom session context that unions the input 
plan with itself.
+struct MyUnionerContext {
+    ctx: SessionContext,
+}
+
+impl Default for MyUnionerContext {
+    fn default() -> Self {
+        Self {
+            ctx: SessionContext::new(),
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl CliSessionContext for MyUnionerContext {
+    fn task_ctx(&self) -> Arc<TaskContext> {
+        self.ctx.task_ctx()
+    }
+
+    fn session_state(&self) -> SessionState {
+        self.ctx.state()
+    }
+
+    fn register_object_store(
+        &self,
+        url: &url::Url,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore + 'static>> {
+        self.ctx.register_object_store(url, object_store)
+    }
+
+    fn register_table_options_extension_from_scheme(&self, _scheme: &str) {
+        unimplemented!()
+    }
+
+    async fn execute_logical_plan(
+        &self,
+        plan: LogicalPlan,
+    ) -> Result<DataFrame, DataFusionError> {
+        let new_plan = LogicalPlanBuilder::from(plan.clone())
+            .union(plan.clone())?
+            .build()?;
+
+        self.ctx.execute_logical_plan(new_plan).await
+    }
+}
+
+#[tokio::main]
+/// Runs the example.
+pub async fn main() {
+    let mut my_ctx = MyUnionerContext::default();
+
+    let mut print_options = PrintOptions {
+        format: datafusion_cli::print_format::PrintFormat::Automatic,
+        quiet: false,
+        maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
+        color: true,
+    };
+
+    exec_from_repl(&mut my_ctx, &mut print_options)
+        .await
+        .unwrap();
+}
diff --git a/datafusion-cli/src/cli_context.rs 
b/datafusion-cli/src/cli_context.rs
new file mode 100644
index 0000000000..516929ebac
--- /dev/null
+++ b/datafusion-cli/src/cli_context.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::{
+    dataframe::DataFrame,
+    error::DataFusionError,
+    execution::{context::SessionState, TaskContext},
+    logical_expr::LogicalPlan,
+    prelude::SessionContext,
+};
+use object_store::ObjectStore;
+
+use crate::object_storage::{AwsOptions, GcpOptions};
+
+#[async_trait::async_trait]
+/// The CLI session context trait provides a way to have a session context 
that can be used with datafusion's CLI code.
+pub trait CliSessionContext {
+    /// Get an atomic reference counted task context.
+    fn task_ctx(&self) -> Arc<TaskContext>;
+
+    /// Get the session state.
+    fn session_state(&self) -> SessionState;
+
+    /// Register an object store with the session context.
+    fn register_object_store(
+        &self,
+        url: &url::Url,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore + 'static>>;
+
+    /// Register table options extension from scheme.
+    fn register_table_options_extension_from_scheme(&self, scheme: &str);
+
+    /// Execute a logical plan and return a DataFrame.
+    async fn execute_logical_plan(
+        &self,
+        plan: LogicalPlan,
+    ) -> Result<DataFrame, DataFusionError>;
+}
+
+#[async_trait::async_trait]
+impl CliSessionContext for SessionContext {
+    fn task_ctx(&self) -> Arc<TaskContext> {
+        self.task_ctx()
+    }
+
+    fn session_state(&self) -> SessionState {
+        self.state()
+    }
+
+    fn register_object_store(
+        &self,
+        url: &url::Url,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore + 'static>> {
+        self.register_object_store(url, object_store)
+    }
+
+    fn register_table_options_extension_from_scheme(&self, scheme: &str) {
+        match scheme {
+            // For Amazon S3 or Alibaba Cloud OSS
+            "s3" | "oss" | "cos" => {
+                // Register AWS specific table options in the session context:
+                self.register_table_options_extension(AwsOptions::default())
+            }
+            // For Google Cloud Storage
+            "gs" | "gcs" => {
+                // Register GCP specific table options in the session context:
+                self.register_table_options_extension(GcpOptions::default())
+            }
+            // For unsupported schemes, do nothing:
+            _ => {}
+        }
+    }
+
+    async fn execute_logical_plan(
+        &self,
+        plan: LogicalPlan,
+    ) -> Result<DataFrame, DataFusionError> {
+        self.execute_logical_plan(plan).await
+    }
+}
diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs
index be6393351a..1a6c023d3b 100644
--- a/datafusion-cli/src/command.rs
+++ b/datafusion-cli/src/command.rs
@@ -17,6 +17,7 @@
 
 //! Command within CLI
 
+use crate::cli_context::CliSessionContext;
 use crate::exec::{exec_and_print, exec_from_lines};
 use crate::functions::{display_all_functions, Function};
 use crate::print_format::PrintFormat;
@@ -28,7 +29,6 @@ use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::exec_err;
 use datafusion::common::instant::Instant;
 use datafusion::error::{DataFusionError, Result};
-use datafusion::prelude::SessionContext;
 use std::fs::File;
 use std::io::BufReader;
 use std::str::FromStr;
@@ -55,7 +55,7 @@ pub enum OutputFormat {
 impl Command {
     pub async fn execute(
         &self,
-        ctx: &mut SessionContext,
+        ctx: &mut dyn CliSessionContext,
         print_options: &mut PrintOptions,
     ) -> Result<()> {
         match self {
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 855d6a7cbb..c4c92be152 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -23,12 +23,13 @@ use std::io::prelude::*;
 use std::io::BufReader;
 use std::str::FromStr;
 
+use crate::cli_context::CliSessionContext;
 use crate::helper::split_from_semicolon;
 use crate::print_format::PrintFormat;
 use crate::{
     command::{Command, OutputFormat},
     helper::{unescape_input, CliHelper},
-    object_storage::{get_object_store, register_options},
+    object_storage::get_object_store,
     print_options::{MaxRows, PrintOptions},
 };
 
@@ -38,7 +39,6 @@ use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_expr::{DdlStatement, LogicalPlan};
 use datafusion::physical_plan::{collect, execute_stream, 
ExecutionPlanProperties};
-use datafusion::prelude::SessionContext;
 use datafusion::sql::parser::{DFParser, Statement};
 use datafusion::sql::sqlparser::dialect::dialect_from_str;
 
@@ -50,7 +50,7 @@ use tokio::signal;
 
 /// run and execute SQL statements and commands, against a context with the 
given print options
 pub async fn exec_from_commands(
-    ctx: &mut SessionContext,
+    ctx: &mut dyn CliSessionContext,
     commands: Vec<String>,
     print_options: &PrintOptions,
 ) -> Result<()> {
@@ -63,7 +63,7 @@ pub async fn exec_from_commands(
 
 /// run and execute SQL statements and commands from a file, against a context 
with the given print options
 pub async fn exec_from_lines(
-    ctx: &mut SessionContext,
+    ctx: &mut dyn CliSessionContext,
     reader: &mut BufReader<File>,
     print_options: &PrintOptions,
 ) -> Result<()> {
@@ -103,7 +103,7 @@ pub async fn exec_from_lines(
 }
 
 pub async fn exec_from_files(
-    ctx: &mut SessionContext,
+    ctx: &mut dyn CliSessionContext,
     files: Vec<String>,
     print_options: &PrintOptions,
 ) -> Result<()> {
@@ -122,7 +122,7 @@ pub async fn exec_from_files(
 
 /// run and execute SQL statements and commands against a context with the 
given print options
 pub async fn exec_from_repl(
-    ctx: &mut SessionContext,
+    ctx: &mut dyn CliSessionContext,
     print_options: &mut PrintOptions,
 ) -> rustyline::Result<()> {
     let mut rl = Editor::new()?;
@@ -205,7 +205,7 @@ pub async fn exec_from_repl(
 }
 
 pub(super) async fn exec_and_print(
-    ctx: &mut SessionContext,
+    ctx: &mut dyn CliSessionContext,
     print_options: &PrintOptions,
     sql: String,
 ) -> Result<()> {
@@ -292,10 +292,10 @@ impl AdjustedPrintOptions {
 }
 
 async fn create_plan(
-    ctx: &mut SessionContext,
+    ctx: &mut dyn CliSessionContext,
     statement: Statement,
 ) -> Result<LogicalPlan, DataFusionError> {
-    let mut plan = ctx.state().statement_to_plan(statement).await?;
+    let mut plan = ctx.session_state().statement_to_plan(statement).await?;
 
     // Note that cmd is a mutable reference so that create_external_table 
function can remove all
     // datafusion-cli specific options before passing through to datafusion. 
Otherwise, datafusion
@@ -354,7 +354,7 @@ async fn create_plan(
 /// alteration fails, or if the object store cannot be retrieved and registered
 /// successfully.
 pub(crate) async fn register_object_store_and_config_extensions(
-    ctx: &SessionContext,
+    ctx: &dyn CliSessionContext,
     location: &String,
     options: &HashMap<String, String>,
     format: Option<FileType>,
@@ -369,17 +369,18 @@ pub(crate) async fn 
register_object_store_and_config_extensions(
     let url = table_path.as_ref();
 
     // Register the options based on the scheme extracted from the location
-    register_options(ctx, scheme);
+    ctx.register_table_options_extension_from_scheme(scheme);
 
     // Clone and modify the default table options based on the provided options
-    let mut table_options = ctx.state().default_table_options().clone();
+    let mut table_options = 
ctx.session_state().default_table_options().clone();
     if let Some(format) = format {
         table_options.set_file_format(format);
     }
     table_options.alter_with_string_hash_map(options)?;
 
     // Retrieve the appropriate object store based on the scheme, URL, and 
modified table options
-    let store = get_object_store(&ctx.state(), scheme, url, 
&table_options).await?;
+    let store =
+        get_object_store(&ctx.session_state(), scheme, url, 
&table_options).await?;
 
     // Register the retrieved object store in the session context's runtime 
environment
     ctx.register_object_store(url, store);
@@ -394,6 +395,7 @@ mod tests {
     use datafusion::common::config::FormatOptions;
     use datafusion::common::plan_err;
 
+    use datafusion::prelude::SessionContext;
     use url::Url;
 
     async fn create_external_table_test(location: &str, sql: &str) -> 
Result<()> {
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 5081436aa6..fbfc9242a6 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -19,6 +19,7 @@
 pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
 
 pub mod catalog;
+pub mod cli_context;
 pub mod command;
 pub mod exec;
 pub mod functions;
diff --git a/datafusion-cli/src/object_storage.rs 
b/datafusion-cli/src/object_storage.rs
index 85e0009bd2..87eb04d113 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -25,7 +25,6 @@ use datafusion::common::config::{
 use datafusion::common::{config_err, exec_datafusion_err, exec_err};
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionState;
-use datafusion::prelude::SessionContext;
 
 use async_trait::async_trait;
 use aws_credential_types::provider::ProvideCredentials;
@@ -392,48 +391,6 @@ impl ConfigExtension for GcpOptions {
     const PREFIX: &'static str = "gcp";
 }
 
-/// Registers storage options for different cloud storage schemes in a given
-/// session context.
-///
-/// This function is responsible for extending the session context with 
specific
-/// options based on the storage scheme being used. These options are essential
-/// for handling interactions with different cloud storage services such as 
Amazon
-/// S3, Alibaba Cloud OSS, Google Cloud Storage, etc.
-///
-/// # Parameters
-///
-/// * `ctx` - A mutable reference to the session context where table options 
are
-///   to be registered. The session context holds configuration and environment
-///   for the current session.
-/// * `scheme` - A string slice that represents the cloud storage scheme. This
-///   determines which set of options will be registered in the session 
context.
-///
-/// # Supported Schemes
-///
-/// * `s3` or `oss` - Registers `AwsOptions` which are configurations specific 
to
-///   Amazon S3 and Alibaba Cloud OSS.
-/// * `gs` or `gcs` - Registers `GcpOptions` which are configurations specific 
to
-///   Google Cloud Storage.
-///
-/// NOTE: This function will not perform any action when given an unsupported 
scheme.
-pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) {
-    // Match the provided scheme against supported cloud storage schemes:
-    match scheme {
-        // For Amazon S3 or Alibaba Cloud OSS
-        "s3" | "oss" | "cos" => {
-            // Register AWS specific table options in the session context:
-            ctx.register_table_options_extension(AwsOptions::default())
-        }
-        // For Google Cloud Storage
-        "gs" | "gcs" => {
-            // Register GCP specific table options in the session context:
-            ctx.register_table_options_extension(GcpOptions::default())
-        }
-        // For unsupported schemes, do nothing:
-        _ => {}
-    }
-}
-
 pub(crate) async fn get_object_store(
     state: &SessionState,
     scheme: &str,
@@ -498,6 +455,8 @@ pub(crate) async fn get_object_store(
 
 #[cfg(test)]
 mod tests {
+    use crate::cli_context::CliSessionContext;
+
     use super::*;
 
     use datafusion::common::plan_err;
@@ -534,7 +493,7 @@ mod tests {
         let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
-            register_options(&ctx, scheme);
+            ctx.register_table_options_extension_from_scheme(scheme);
             let mut table_options = 
ctx.state().default_table_options().clone();
             table_options.alter_with_string_hash_map(&cmd.options)?;
             let aws_options = 
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -579,7 +538,7 @@ mod tests {
         let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
-            register_options(&ctx, scheme);
+            ctx.register_table_options_extension_from_scheme(scheme);
             let mut table_options = 
ctx.state().default_table_options().clone();
             table_options.alter_with_string_hash_map(&cmd.options)?;
             let aws_options = 
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -605,7 +564,7 @@ mod tests {
         let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
-            register_options(&ctx, scheme);
+            ctx.register_table_options_extension_from_scheme(scheme);
             let mut table_options = 
ctx.state().default_table_options().clone();
             table_options.alter_with_string_hash_map(&cmd.options)?;
             let aws_options = 
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -633,7 +592,7 @@ mod tests {
         let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
-            register_options(&ctx, scheme);
+            ctx.register_table_options_extension_from_scheme(scheme);
             let mut table_options = 
ctx.state().default_table_options().clone();
             table_options.alter_with_string_hash_map(&cmd.options)?;
             let aws_options = 
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -670,7 +629,7 @@ mod tests {
         let mut plan = ctx.state().create_logical_plan(&sql).await?;
 
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
-            register_options(&ctx, scheme);
+            ctx.register_table_options_extension_from_scheme(scheme);
             let mut table_options = 
ctx.state().default_table_options().clone();
             table_options.alter_with_string_hash_map(&cmd.options)?;
             let gcp_options = 
table_options.extensions.get::<GcpOptions>().unwrap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to