This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 861a2364bd feat: add CliSessionContext trait for cli (#10890)
861a2364bd is described below
commit 861a2364bdf04854482384c29d9b64962da377fe
Author: Trent Hauck <[email protected]>
AuthorDate: Mon Jun 17 10:16:20 2024 -0700
feat: add CliSessionContext trait for cli (#10890)
use CliSessionContext trait for cli
---
datafusion-cli/examples/cli-session-context.rs | 97 +++++++++++++++++++++++++
datafusion-cli/src/cli_context.rs | 98 ++++++++++++++++++++++++++
datafusion-cli/src/command.rs | 4 +-
datafusion-cli/src/exec.rs | 28 ++++----
datafusion-cli/src/lib.rs | 1 +
datafusion-cli/src/object_storage.rs | 55 ++-------------
6 files changed, 220 insertions(+), 63 deletions(-)
diff --git a/datafusion-cli/examples/cli-session-context.rs
b/datafusion-cli/examples/cli-session-context.rs
new file mode 100644
index 0000000000..8da52ed84a
--- /dev/null
+++ b/datafusion-cli/examples/cli-session-context.rs
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shows an example of a custom session context that unions the input plan
with itself.
+//! To run this example, use `cargo run --example cli-session-context` from
within the `datafusion-cli` directory.
+
+use std::sync::Arc;
+
+use datafusion::{
+ dataframe::DataFrame,
+ error::DataFusionError,
+ execution::{context::SessionState, TaskContext},
+ logical_expr::{LogicalPlan, LogicalPlanBuilder},
+ prelude::SessionContext,
+};
+use datafusion_cli::{
+ cli_context::CliSessionContext, exec::exec_from_repl,
print_options::PrintOptions,
+};
+use object_store::ObjectStore;
+
+/// This is a toy example of a custom session context that unions the input
plan with itself.
+struct MyUnionerContext {
+ ctx: SessionContext,
+}
+
+impl Default for MyUnionerContext {
+ fn default() -> Self {
+ Self {
+ ctx: SessionContext::new(),
+ }
+ }
+}
+
+#[async_trait::async_trait]
+impl CliSessionContext for MyUnionerContext {
+ fn task_ctx(&self) -> Arc<TaskContext> {
+ self.ctx.task_ctx()
+ }
+
+ fn session_state(&self) -> SessionState {
+ self.ctx.state()
+ }
+
+ fn register_object_store(
+ &self,
+ url: &url::Url,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore + 'static>> {
+ self.ctx.register_object_store(url, object_store)
+ }
+
+ fn register_table_options_extension_from_scheme(&self, _scheme: &str) {
+ unimplemented!()
+ }
+
+ async fn execute_logical_plan(
+ &self,
+ plan: LogicalPlan,
+ ) -> Result<DataFrame, DataFusionError> {
+ let new_plan = LogicalPlanBuilder::from(plan.clone())
+ .union(plan.clone())?
+ .build()?;
+
+ self.ctx.execute_logical_plan(new_plan).await
+ }
+}
+
+#[tokio::main]
+/// Runs the example.
+pub async fn main() {
+ let mut my_ctx = MyUnionerContext::default();
+
+ let mut print_options = PrintOptions {
+ format: datafusion_cli::print_format::PrintFormat::Automatic,
+ quiet: false,
+ maxrows: datafusion_cli::print_options::MaxRows::Unlimited,
+ color: true,
+ };
+
+ exec_from_repl(&mut my_ctx, &mut print_options)
+ .await
+ .unwrap();
+}
diff --git a/datafusion-cli/src/cli_context.rs
b/datafusion-cli/src/cli_context.rs
new file mode 100644
index 0000000000..516929ebac
--- /dev/null
+++ b/datafusion-cli/src/cli_context.rs
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use datafusion::{
+ dataframe::DataFrame,
+ error::DataFusionError,
+ execution::{context::SessionState, TaskContext},
+ logical_expr::LogicalPlan,
+ prelude::SessionContext,
+};
+use object_store::ObjectStore;
+
+use crate::object_storage::{AwsOptions, GcpOptions};
+
+#[async_trait::async_trait]
+/// The CLI session context trait provides a way to have a session context
that can be used with datafusion's CLI code.
+pub trait CliSessionContext {
+ /// Get an atomic reference counted task context.
+ fn task_ctx(&self) -> Arc<TaskContext>;
+
+ /// Get the session state.
+ fn session_state(&self) -> SessionState;
+
+ /// Register an object store with the session context.
+ fn register_object_store(
+ &self,
+ url: &url::Url,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore + 'static>>;
+
+ /// Register table options extension from scheme.
+ fn register_table_options_extension_from_scheme(&self, scheme: &str);
+
+ /// Execute a logical plan and return a DataFrame.
+ async fn execute_logical_plan(
+ &self,
+ plan: LogicalPlan,
+ ) -> Result<DataFrame, DataFusionError>;
+}
+
+#[async_trait::async_trait]
+impl CliSessionContext for SessionContext {
+ fn task_ctx(&self) -> Arc<TaskContext> {
+ self.task_ctx()
+ }
+
+ fn session_state(&self) -> SessionState {
+ self.state()
+ }
+
+ fn register_object_store(
+ &self,
+ url: &url::Url,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore + 'static>> {
+ self.register_object_store(url, object_store)
+ }
+
+ fn register_table_options_extension_from_scheme(&self, scheme: &str) {
+ match scheme {
+ // For Amazon S3 or Alibaba Cloud OSS
+ "s3" | "oss" | "cos" => {
+ // Register AWS specific table options in the session context:
+ self.register_table_options_extension(AwsOptions::default())
+ }
+ // For Google Cloud Storage
+ "gs" | "gcs" => {
+ // Register GCP specific table options in the session context:
+ self.register_table_options_extension(GcpOptions::default())
+ }
+ // For unsupported schemes, do nothing:
+ _ => {}
+ }
+ }
+
+ async fn execute_logical_plan(
+ &self,
+ plan: LogicalPlan,
+ ) -> Result<DataFrame, DataFusionError> {
+ self.execute_logical_plan(plan).await
+ }
+}
diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs
index be6393351a..1a6c023d3b 100644
--- a/datafusion-cli/src/command.rs
+++ b/datafusion-cli/src/command.rs
@@ -17,6 +17,7 @@
//! Command within CLI
+use crate::cli_context::CliSessionContext;
use crate::exec::{exec_and_print, exec_from_lines};
use crate::functions::{display_all_functions, Function};
use crate::print_format::PrintFormat;
@@ -28,7 +29,6 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::exec_err;
use datafusion::common::instant::Instant;
use datafusion::error::{DataFusionError, Result};
-use datafusion::prelude::SessionContext;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
@@ -55,7 +55,7 @@ pub enum OutputFormat {
impl Command {
pub async fn execute(
&self,
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> Result<()> {
match self {
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 855d6a7cbb..c4c92be152 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -23,12 +23,13 @@ use std::io::prelude::*;
use std::io::BufReader;
use std::str::FromStr;
+use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
- object_storage::{get_object_store, register_options},
+ object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
@@ -38,7 +39,6 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream,
ExecutionPlanProperties};
-use datafusion::prelude::SessionContext;
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
@@ -50,7 +50,7 @@ use tokio::signal;
/// run and execute SQL statements and commands, against a context with the
given print options
pub async fn exec_from_commands(
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
commands: Vec<String>,
print_options: &PrintOptions,
) -> Result<()> {
@@ -63,7 +63,7 @@ pub async fn exec_from_commands(
/// run and execute SQL statements and commands from a file, against a context
with the given print options
pub async fn exec_from_lines(
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
reader: &mut BufReader<File>,
print_options: &PrintOptions,
) -> Result<()> {
@@ -103,7 +103,7 @@ pub async fn exec_from_lines(
}
pub async fn exec_from_files(
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
files: Vec<String>,
print_options: &PrintOptions,
) -> Result<()> {
@@ -122,7 +122,7 @@ pub async fn exec_from_files(
/// run and execute SQL statements and commands against a context with the
given print options
pub async fn exec_from_repl(
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
print_options: &mut PrintOptions,
) -> rustyline::Result<()> {
let mut rl = Editor::new()?;
@@ -205,7 +205,7 @@ pub async fn exec_from_repl(
}
pub(super) async fn exec_and_print(
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
print_options: &PrintOptions,
sql: String,
) -> Result<()> {
@@ -292,10 +292,10 @@ impl AdjustedPrintOptions {
}
async fn create_plan(
- ctx: &mut SessionContext,
+ ctx: &mut dyn CliSessionContext,
statement: Statement,
) -> Result<LogicalPlan, DataFusionError> {
- let mut plan = ctx.state().statement_to_plan(statement).await?;
+ let mut plan = ctx.session_state().statement_to_plan(statement).await?;
// Note that cmd is a mutable reference so that create_external_table
function can remove all
// datafusion-cli specific options before passing through to datafusion.
Otherwise, datafusion
@@ -354,7 +354,7 @@ async fn create_plan(
/// alteration fails, or if the object store cannot be retrieved and registered
/// successfully.
pub(crate) async fn register_object_store_and_config_extensions(
- ctx: &SessionContext,
+ ctx: &dyn CliSessionContext,
location: &String,
options: &HashMap<String, String>,
format: Option<FileType>,
@@ -369,17 +369,18 @@ pub(crate) async fn
register_object_store_and_config_extensions(
let url = table_path.as_ref();
// Register the options based on the scheme extracted from the location
- register_options(ctx, scheme);
+ ctx.register_table_options_extension_from_scheme(scheme);
// Clone and modify the default table options based on the provided options
- let mut table_options = ctx.state().default_table_options().clone();
+ let mut table_options =
ctx.session_state().default_table_options().clone();
if let Some(format) = format {
table_options.set_file_format(format);
}
table_options.alter_with_string_hash_map(options)?;
// Retrieve the appropriate object store based on the scheme, URL, and
modified table options
- let store = get_object_store(&ctx.state(), scheme, url,
&table_options).await?;
+ let store =
+ get_object_store(&ctx.session_state(), scheme, url,
&table_options).await?;
// Register the retrieved object store in the session context's runtime
environment
ctx.register_object_store(url, store);
@@ -394,6 +395,7 @@ mod tests {
use datafusion::common::config::FormatOptions;
use datafusion::common::plan_err;
+ use datafusion::prelude::SessionContext;
use url::Url;
async fn create_external_table_test(location: &str, sql: &str) ->
Result<()> {
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 5081436aa6..fbfc9242a6 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -19,6 +19,7 @@
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
pub mod catalog;
+pub mod cli_context;
pub mod command;
pub mod exec;
pub mod functions;
diff --git a/datafusion-cli/src/object_storage.rs
b/datafusion-cli/src/object_storage.rs
index 85e0009bd2..87eb04d113 100644
--- a/datafusion-cli/src/object_storage.rs
+++ b/datafusion-cli/src/object_storage.rs
@@ -25,7 +25,6 @@ use datafusion::common::config::{
use datafusion::common::{config_err, exec_datafusion_err, exec_err};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
-use datafusion::prelude::SessionContext;
use async_trait::async_trait;
use aws_credential_types::provider::ProvideCredentials;
@@ -392,48 +391,6 @@ impl ConfigExtension for GcpOptions {
const PREFIX: &'static str = "gcp";
}
-/// Registers storage options for different cloud storage schemes in a given
-/// session context.
-///
-/// This function is responsible for extending the session context with
specific
-/// options based on the storage scheme being used. These options are essential
-/// for handling interactions with different cloud storage services such as
Amazon
-/// S3, Alibaba Cloud OSS, Google Cloud Storage, etc.
-///
-/// # Parameters
-///
-/// * `ctx` - A mutable reference to the session context where table options
are
-/// to be registered. The session context holds configuration and environment
-/// for the current session.
-/// * `scheme` - A string slice that represents the cloud storage scheme. This
-/// determines which set of options will be registered in the session
context.
-///
-/// # Supported Schemes
-///
-/// * `s3` or `oss` - Registers `AwsOptions` which are configurations specific
to
-/// Amazon S3 and Alibaba Cloud OSS.
-/// * `gs` or `gcs` - Registers `GcpOptions` which are configurations specific
to
-/// Google Cloud Storage.
-///
-/// NOTE: This function will not perform any action when given an unsupported
scheme.
-pub(crate) fn register_options(ctx: &SessionContext, scheme: &str) {
- // Match the provided scheme against supported cloud storage schemes:
- match scheme {
- // For Amazon S3 or Alibaba Cloud OSS
- "s3" | "oss" | "cos" => {
- // Register AWS specific table options in the session context:
- ctx.register_table_options_extension(AwsOptions::default())
- }
- // For Google Cloud Storage
- "gs" | "gcs" => {
- // Register GCP specific table options in the session context:
- ctx.register_table_options_extension(GcpOptions::default())
- }
- // For unsupported schemes, do nothing:
- _ => {}
- }
-}
-
pub(crate) async fn get_object_store(
state: &SessionState,
scheme: &str,
@@ -498,6 +455,8 @@ pub(crate) async fn get_object_store(
#[cfg(test)]
mod tests {
+ use crate::cli_context::CliSessionContext;
+
use super::*;
use datafusion::common::plan_err;
@@ -534,7 +493,7 @@ mod tests {
let mut plan = ctx.state().create_logical_plan(&sql).await?;
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
- register_options(&ctx, scheme);
+ ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options =
ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options =
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -579,7 +538,7 @@ mod tests {
let mut plan = ctx.state().create_logical_plan(&sql).await?;
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
- register_options(&ctx, scheme);
+ ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options =
ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options =
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -605,7 +564,7 @@ mod tests {
let mut plan = ctx.state().create_logical_plan(&sql).await?;
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
- register_options(&ctx, scheme);
+ ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options =
ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options =
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -633,7 +592,7 @@ mod tests {
let mut plan = ctx.state().create_logical_plan(&sql).await?;
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
- register_options(&ctx, scheme);
+ ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options =
ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let aws_options =
table_options.extensions.get::<AwsOptions>().unwrap();
@@ -670,7 +629,7 @@ mod tests {
let mut plan = ctx.state().create_logical_plan(&sql).await?;
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
- register_options(&ctx, scheme);
+ ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options =
ctx.state().default_table_options().clone();
table_options.alter_with_string_hash_map(&cmd.options)?;
let gcp_options =
table_options.extensions.get::<GcpOptions>().unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]