This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 36e74303 Decommission BallistaContext ... (#1119)
36e74303 is described below

commit 36e743032520b9f8dc20a21368d080f546417999
Author: Marko Milenković <[email protected]>
AuthorDate: Tue Nov 19 20:47:05 2024 +0000

    Decommission BallistaContext ... (#1119)
    
    closes #1067
---
 ballista-cli/src/command.rs                 |   32 +-
 ballista-cli/src/exec.rs                    |    2 +-
 ballista-cli/src/main.rs                    |    6 +-
 ballista/client/Cargo.toml                  |    2 +-
 ballista/client/README.md                   |   10 +-
 ballista/client/src/context.rs              | 1044 ---------------------------
 ballista/client/src/lib.rs                  |    1 -
 ballista/client/src/prelude.rs              |   12 +-
 ballista/client/tests/context_standalone.rs |    4 +-
 ballista/client/tests/remote.rs             |    2 +-
 ballista/client/tests/setup.rs              |    2 +-
 ballista/client/tests/standalone.rs         |    3 +-
 examples/examples/remote-dataframe.rs       |    1 +
 examples/examples/remote-sql.rs             |    1 +
 examples/examples/standalone-sql.rs         |    3 +-
 15 files changed, 38 insertions(+), 1087 deletions(-)

diff --git a/ballista-cli/src/command.rs b/ballista-cli/src/command.rs
index e5d88729..3860e46d 100644
--- a/ballista-cli/src/command.rs
+++ b/ballista-cli/src/command.rs
@@ -21,11 +21,11 @@ use std::str::FromStr;
 use std::sync::Arc;
 use std::time::Instant;
 
-use ballista::prelude::{BallistaError, Result};
-
 use datafusion::arrow::array::{ArrayRef, StringArray};
 use datafusion::arrow::datatypes::{DataType, Field, Schema};
 use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::common::Result;
+use datafusion::error::DataFusionError;
 use datafusion::prelude::SessionContext;
 
 use crate::functions::{display_all_functions, Function};
@@ -60,25 +60,23 @@ impl Command {
             Self::Help =>
             // TODO need to provide valid schema
             {
-                print_options
-                    .print_batches(Arc::new(Schema::empty()), 
&[all_commands_info()], now)
-                    .map_err(BallistaError::DataFusionError)
+                print_options.print_batches(
+                    Arc::new(Schema::empty()),
+                    &[all_commands_info()],
+                    now,
+                )
             }
             Self::ListTables => {
                 let df = ctx.sql("SHOW TABLES").await?;
                 let schema = Arc::new(df.schema().as_arrow().clone());
                 let batches = df.collect().await?;
-                print_options
-                    .print_batches(schema, &batches, now)
-                    .map_err(BallistaError::DataFusionError)
+                print_options.print_batches(schema, &batches, now)
             }
             Self::DescribeTable(name) => {
                 let df = ctx.sql(&format!("SHOW COLUMNS FROM {name}")).await?;
                 let schema = Arc::new(df.schema().as_arrow().clone());
                 let batches = df.collect().await?;
-                print_options
-                    .print_batches(schema, &batches, now)
-                    .map_err(BallistaError::DataFusionError)
+                print_options.print_batches(schema, &batches, now)
             }
             Self::QuietMode(quiet) => {
                 if let Some(quiet) = quiet {
@@ -95,12 +93,10 @@ impl Command {
                 }
                 Ok(())
             }
-            Self::Quit => Err(BallistaError::Internal(
+            Self::Quit => Err(DataFusionError::Internal(
                 "Unexpected quit, this should be handled outside".to_string(),
             )),
-            Self::ListFunctions => {
-                display_all_functions().map_err(BallistaError::DataFusionError)
-            }
+            Self::ListFunctions => display_all_functions(),
             Self::SearchFunctions(function) => {
                 if let Ok(func) = function.parse::<Function>() {
                     let details = func.function_details()?;
@@ -108,10 +104,10 @@ impl Command {
                     Ok(())
                 } else {
                     let msg = format!("{function} is not a supported 
function");
-                    Err(BallistaError::NotImplemented(msg))
+                    Err(DataFusionError::NotImplemented(msg))
                 }
             }
-            Self::OutputFormat(_) => Err(BallistaError::Internal(
+            Self::OutputFormat(_) => Err(DataFusionError::Internal(
                 "Unexpected change output format, this should be handled 
outside"
                     .to_string(),
             )),
@@ -221,7 +217,7 @@ impl OutputFormat {
                     println!("Output format is {:?}.", print_options.format);
                     Ok(())
                 } else {
-                    Err(BallistaError::General(format!(
+                    Err(DataFusionError::Execution(format!(
                         "{:?} is not a valid format type [possible values: 
{:?}]",
                         format,
                         "TO BE FIXED", //PrintFormat::value_variants()
diff --git a/ballista-cli/src/exec.rs b/ballista-cli/src/exec.rs
index 8795b4e6..e9dd870d 100644
--- a/ballista-cli/src/exec.rs
+++ b/ballista-cli/src/exec.rs
@@ -23,7 +23,7 @@ use std::io::BufReader;
 use std::sync::Arc;
 use std::time::Instant;
 
-use ballista::prelude::Result;
+use datafusion::common::Result;
 use datafusion::prelude::SessionContext;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 3c9902a7..caec5ed5 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -18,15 +18,13 @@
 use std::env;
 use std::path::Path;
 
-use ballista::{
-    extension::SessionConfigExt,
-    prelude::{Result, SessionContextExt},
-};
+use ballista::{extension::SessionConfigExt, prelude::SessionContextExt};
 use ballista_cli::{
     exec, print_format::PrintFormat, print_options::PrintOptions, 
BALLISTA_CLI_VERSION,
 };
 use clap::Parser;
 use datafusion::{
+    common::Result,
     execution::SessionStateBuilder,
     prelude::{SessionConfig, SessionContext},
 };
diff --git a/ballista/client/Cargo.toml b/ballista/client/Cargo.toml
index 219974d0..a5d93030 100644
--- a/ballista/client/Cargo.toml
+++ b/ballista/client/Cargo.toml
@@ -50,6 +50,6 @@ object_store = { workspace = true, features = ["aws"] }
 testcontainers-modules = { version = "0.11", features = ["minio"] }
 
 [features]
-default = []
+default = ["standalone"]
 standalone = ["ballista-executor", "ballista-scheduler"]
 testcontainers = []
diff --git a/ballista/client/README.md b/ballista/client/README.md
index 503d6e08..4f87b064 100644
--- a/ballista/client/README.md
+++ b/ballista/client/README.md
@@ -92,16 +92,16 @@ data set. Download the file and add it to the `testdata` 
folder before running t
 
 ```rust,no_run
 use ballista::prelude::*;
-use datafusion::prelude::{col, ParquetReadOptions};
+use datafusion::common::Result;
+use datafusion::prelude::{col, SessionContext, ParquetReadOptions};
 use datafusion::functions_aggregate::{min_max::min, min_max::max, sum::sum, 
average::avg};
 
 #[tokio::main]
 async fn main() -> Result<()> {
-    // create configuration
-    let config = BallistaConfig::default();
+
 
     // connect to Ballista scheduler
-    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
+    let ctx = SessionContext::remote("df://localhost:50050").await?;
 
     let filename = "testdata/yellow_tripdata_2022-01.parquet";
 
@@ -154,4 +154,4 @@ The output should look similar to the following table.
 
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
 ```
 
-More [examples](https://github.com/apache/arrow-ballista/tree/main/examples) 
can be found in the arrow-ballista repository.
+More [examples](../../examples/examples/) can be found in the arrow-ballista 
repository.
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
deleted file mode 100644
index 48128537..00000000
--- a/ballista/client/src/context.rs
+++ /dev/null
@@ -1,1044 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Distributed execution context.
-#![allow(deprecated)] // TO BE REMOVED
-
-use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::execution::context::DataFilePaths;
-use datafusion::sql::sqlparser::ast::Statement;
-use datafusion::sql::TableReference;
-use log::info;
-use parking_lot::Mutex;
-use std::collections::HashMap;
-use std::sync::Arc;
-
-use ballista_core::config::BallistaConfig;
-use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
-use ballista_core::serde::protobuf::{CreateSessionParams, KeyValuePair};
-use ballista_core::utils::{
-    create_df_ctx_with_ballista_query_planner, create_grpc_client_connection,
-    SessionConfigExt,
-};
-use datafusion_proto::protobuf::LogicalPlanNode;
-
-use datafusion::dataframe::DataFrame;
-use datafusion::datasource::{source_as_provider, TableProvider};
-use datafusion::error::{DataFusionError, Result};
-use datafusion::logical_expr::{
-    CreateExternalTable, DdlStatement, LogicalPlan, TableScan,
-};
-use datafusion::prelude::{
-    AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
-    SessionConfig, SessionContext,
-};
-use datafusion::sql::parser::{DFParser, Statement as DFStatement};
-
-struct BallistaContextState {
-    /// Ballista configuration
-    config: BallistaConfig,
-    /// Scheduler host
-    scheduler_host: String,
-    /// Scheduler port
-    scheduler_port: u16,
-    /// Tables that have been registered with this context
-    tables: HashMap<String, Arc<dyn TableProvider>>,
-}
-
-impl BallistaContextState {
-    pub fn new(
-        scheduler_host: String,
-        scheduler_port: u16,
-        config: &BallistaConfig,
-    ) -> Self {
-        Self {
-            config: config.clone(),
-            scheduler_host,
-            scheduler_port,
-            tables: HashMap::new(),
-        }
-    }
-
-    pub fn config(&self) -> &BallistaConfig {
-        &self.config
-    }
-}
-
-#[deprecated]
-pub struct BallistaContext {
-    state: Arc<Mutex<BallistaContextState>>,
-    context: Arc<SessionContext>,
-}
-
-impl BallistaContext {
-    /// Create a context for executing queries against a remote Ballista 
scheduler instance
-    pub async fn remote(
-        host: &str,
-        port: u16,
-        config: &BallistaConfig,
-    ) -> ballista_core::error::Result<Self> {
-        let state = BallistaContextState::new(host.to_owned(), port, config);
-
-        let scheduler_url =
-            format!("http://{}:{}";, &state.scheduler_host, 
state.scheduler_port);
-        info!(
-            "Connecting to Ballista scheduler at {}",
-            scheduler_url.clone()
-        );
-        let connection = create_grpc_client_connection(scheduler_url.clone())
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
-        let limit = config.default_grpc_client_max_message_size();
-        let mut scheduler = SchedulerGrpcClient::new(connection)
-            .max_encoding_message_size(limit)
-            .max_decoding_message_size(limit);
-
-        let remote_session_id = scheduler
-            .create_session(CreateSessionParams {
-                settings: config
-                    .settings()
-                    .iter()
-                    .map(|(k, v)| KeyValuePair {
-                        key: k.to_owned(),
-                        value: v.to_owned(),
-                    })
-                    .collect::<Vec<_>>(),
-            })
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
-            .into_inner()
-            .session_id;
-
-        info!(
-            "Server side SessionContext created with session id: {}",
-            remote_session_id
-        );
-
-        let ctx = {
-            create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
-                scheduler_url,
-                remote_session_id,
-                state.config(),
-            )
-        };
-
-        Ok(Self {
-            state: Arc::new(Mutex::new(state)),
-            context: Arc::new(ctx),
-        })
-    }
-
-    #[cfg(feature = "standalone")]
-    pub async fn standalone(
-        config: &BallistaConfig,
-        concurrent_tasks: usize,
-    ) -> ballista_core::error::Result<Self> {
-        use ballista_core::serde::BallistaCodec;
-        use datafusion_proto::protobuf::PhysicalPlanNode;
-
-        log::info!("Running in local mode. Scheduler will be run in-proc");
-
-        let addr = 
ballista_scheduler::standalone::new_standalone_scheduler().await?;
-        let scheduler_url = format!("http://localhost:{}";, addr.port());
-        let mut scheduler = loop {
-            match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
-                Err(_) => {
-                    
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
-                    log::info!("Attempting to connect to in-proc 
scheduler...");
-                }
-                Ok(scheduler) => break scheduler,
-            }
-        };
-
-        let remote_session_id = scheduler
-            .create_session(CreateSessionParams {
-                settings: config
-                    .settings()
-                    .iter()
-                    .map(|(k, v)| KeyValuePair {
-                        key: k.to_owned(),
-                        value: v.to_owned(),
-                    })
-                    .collect::<Vec<_>>(),
-            })
-            .await
-            .map_err(|e| DataFusionError::Execution(format!("{e:?}")))?
-            .into_inner()
-            .session_id;
-
-        info!(
-            "Server side SessionContext created with session id: {}",
-            remote_session_id
-        );
-
-        let ctx = {
-            create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
-                scheduler_url,
-                remote_session_id,
-                config,
-            )
-        };
-
-        let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
-            BallistaCodec::default();
-
-        ballista_executor::new_standalone_executor(
-            scheduler,
-            concurrent_tasks,
-            default_codec,
-        )
-        .await?;
-
-        let state =
-            BallistaContextState::new("localhost".to_string(), addr.port(), 
config);
-
-        Ok(Self {
-            state: Arc::new(Mutex::new(state)),
-            context: Arc::new(ctx),
-        })
-    }
-
-    /// Create a DataFrame representing an Json table scan
-    pub async fn read_json<P: DataFilePaths>(
-        &self,
-        paths: P,
-        options: NdJsonReadOptions<'_>,
-    ) -> Result<DataFrame> {
-        let df = self.context.read_json(paths, options).await?;
-        Ok(df)
-    }
-
-    /// Create a DataFrame representing an Avro table scan
-    pub async fn read_avro<P: DataFilePaths>(
-        &self,
-        paths: P,
-        options: AvroReadOptions<'_>,
-    ) -> Result<DataFrame> {
-        let df = self.context.read_avro(paths, options).await?;
-        Ok(df)
-    }
-
-    /// Create a DataFrame representing a Parquet table scan
-    pub async fn read_parquet<P: DataFilePaths>(
-        &self,
-        paths: P,
-        options: ParquetReadOptions<'_>,
-    ) -> Result<DataFrame> {
-        let df = self.context.read_parquet(paths, options).await?;
-        Ok(df)
-    }
-
-    /// Create a DataFrame representing a CSV table scan
-    pub async fn read_csv<P: DataFilePaths>(
-        &self,
-        paths: P,
-        options: CsvReadOptions<'_>,
-    ) -> Result<DataFrame> {
-        let df = self.context.read_csv(paths, options).await?;
-        Ok(df)
-    }
-
-    /// Register a DataFrame as a table that can be referenced from a SQL query
-    pub fn register_table(
-        &self,
-        name: &str,
-        table: Arc<dyn TableProvider>,
-    ) -> Result<()> {
-        let mut state = self.state.lock();
-        state.tables.insert(name.to_owned(), table);
-        Ok(())
-    }
-
-    pub async fn register_csv(
-        &self,
-        name: &str,
-        path: &str,
-        options: CsvReadOptions<'_>,
-    ) -> Result<()> {
-        let plan = self
-            .read_csv(path, options)
-            .await
-            .map_err(|e| {
-                DataFusionError::Context(format!("Can't read CSV: {path}"), 
Box::new(e))
-            })?
-            .into_optimized_plan()?;
-        match plan {
-            LogicalPlan::TableScan(TableScan { source, .. }) => {
-                self.register_table(name, source_as_provider(&source)?)
-            }
-            _ => Err(DataFusionError::Internal("Expected tables 
scan".to_owned())),
-        }
-    }
-
-    pub async fn register_parquet(
-        &self,
-        name: &str,
-        path: &str,
-        options: ParquetReadOptions<'_>,
-    ) -> Result<()> {
-        match self
-            .read_parquet(path, options)
-            .await?
-            .into_optimized_plan()?
-        {
-            LogicalPlan::TableScan(TableScan { source, .. }) => {
-                self.register_table(name, source_as_provider(&source)?)
-            }
-            _ => Err(DataFusionError::Internal("Expected tables 
scan".to_owned())),
-        }
-    }
-
-    pub async fn register_avro(
-        &self,
-        name: &str,
-        path: &str,
-        options: AvroReadOptions<'_>,
-    ) -> Result<()> {
-        match self.read_avro(path, options).await?.into_optimized_plan()? {
-            LogicalPlan::TableScan(TableScan { source, .. }) => {
-                self.register_table(name, source_as_provider(&source)?)
-            }
-            _ => Err(DataFusionError::Internal("Expected tables 
scan".to_owned())),
-        }
-    }
-
-    /// is a 'show *' sql
-    pub async fn is_show_statement(&self, sql: &str) -> Result<bool> {
-        let mut is_show_variable: bool = false;
-        let statements = DFParser::parse_sql(sql)?;
-
-        if statements.len() != 1 {
-            return Err(DataFusionError::NotImplemented(
-                "The context currently only supports a single SQL 
statement".to_string(),
-            ));
-        }
-
-        if let DFStatement::Statement(st) = &statements[0] {
-            match **st {
-                Statement::ShowVariable { .. } => {
-                    is_show_variable = true;
-                }
-                Statement::ShowColumns { .. } => {
-                    is_show_variable = true;
-                }
-                Statement::ShowTables { .. } => {
-                    is_show_variable = true;
-                }
-                _ => {
-                    is_show_variable = false;
-                }
-            }
-        };
-
-        Ok(is_show_variable)
-    }
-
-    pub fn context(&self) -> &SessionContext {
-        &self.context
-    }
-
-    /// Create a DataFrame from a SQL statement.
-    ///
-    /// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
-    /// might require the schema to be inferred.
-    pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
-        let mut ctx = self.context.clone();
-
-        let is_show = self.is_show_statement(sql).await?;
-        // the show tables、 show columns sql can not run at scheduler because 
the tables is store at client
-        if is_show {
-            ctx = Arc::new(SessionContext::new_with_config(
-                SessionConfig::new_with_ballista(),
-            ));
-        }
-
-        // register tables with DataFusion context
-        {
-            let state = self.state.lock();
-            for (name, prov) in &state.tables {
-                // ctx is shared between queries, check table exists or not 
before register
-                let table_ref = TableReference::Bare {
-                    table: name.as_str().into(),
-                };
-                if !ctx.table_exist(table_ref)? {
-                    ctx.register_table(
-                        TableReference::Bare {
-                            table: name.as_str().into(),
-                        },
-                        Arc::clone(prov),
-                    )?;
-                }
-            }
-        }
-
-        let plan = ctx.state().create_logical_plan(sql).await?;
-
-        match &plan {
-            LogicalPlan::Ddl(DdlStatement::CreateExternalTable(
-                CreateExternalTable {
-                    ref schema,
-                    ref name,
-                    ref location,
-                    ref file_type,
-                    ref table_partition_cols,
-                    ref if_not_exists,
-                    options,
-                    ..
-                },
-            )) => {
-                let table_exists = ctx.table_exist(name.to_owned())?;
-                let schema: SchemaRef = 
Arc::new(schema.as_ref().to_owned().into());
-                let table_partition_cols = table_partition_cols
-                    .iter()
-                    .map(|col| {
-                        schema
-                            .field_with_name(col)
-                            .map(|f| (f.name().to_owned(), 
f.data_type().to_owned()))
-                            .map_err(|e| DataFusionError::ArrowError(e, None))
-                    })
-                    .collect::<Result<Vec<_>>>()?;
-
-                match (if_not_exists, table_exists) {
-                    (_, false) => match file_type.to_lowercase().as_str() {
-                        "csv" => {
-                            let has_header = match 
options.get("format.has_header") {
-                                Some(str) => str.parse::<bool>().unwrap(),
-                                None => false,
-                            };
-                            let delimiter = match 
options.get("format.delimiter") {
-                                Some(str) => str.chars().next().unwrap(),
-                                None => ',',
-                            };
-                            let mut options = CsvReadOptions::new()
-                                .has_header(has_header)
-                                .delimiter(delimiter as u8)
-                                
.table_partition_cols(table_partition_cols.to_vec());
-                            if !schema.fields().is_empty() {
-                                options = options.schema(&schema);
-                            }
-                            self.register_csv(name.table(), location, 
options).await?;
-                            Ok(DataFrame::new(ctx.state(), plan))
-                        }
-                        "parquet" => {
-                            self.register_parquet(
-                                name.table(),
-                                location,
-                                ParquetReadOptions::default()
-                                    
.table_partition_cols(table_partition_cols),
-                            )
-                            .await?;
-                            Ok(DataFrame::new(ctx.state(), plan))
-                        }
-                        "avro" => {
-                            self.register_avro(
-                                name.table(),
-                                location,
-                                AvroReadOptions::default()
-                                    
.table_partition_cols(table_partition_cols),
-                            )
-                            .await?;
-                            Ok(DataFrame::new(ctx.state(), plan))
-                        }
-                        _ => Err(DataFusionError::NotImplemented(format!(
-                            "Unsupported file type {file_type:?}."
-                        ))),
-                    },
-                    (true, true) => Ok(DataFrame::new(ctx.state(), plan)),
-                    (false, true) => Err(DataFusionError::Execution(format!(
-                        "Table '{name:?}' already exists"
-                    ))),
-                }
-            }
-            _ => ctx.execute_logical_plan(plan).await,
-        }
-    }
-
-    /// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
-    /// is not featured limited (so all SQL such as `CREATE TABLE` and
-    /// `COPY` will be run).
-    ///
-    /// If you wish to limit the type of plan that can be run from
-    /// SQL, see [`Self::sql_with_options`] and
-    /// [`SQLOptions::verify_plan`].
-    pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> 
Result<DataFrame> {
-        let ctx = self.context.clone();
-        ctx.execute_logical_plan(plan).await
-    }
-}
-
-#[cfg(test)]
-#[cfg(feature = "standalone")]
-mod standalone_tests {
-    use ballista_core::config::BallistaConfig;
-    use datafusion::arrow;
-    use datafusion::arrow::util::pretty::pretty_format_batches;
-
-    use crate::context::BallistaContext;
-    use ballista_core::error::Result;
-    use datafusion::config::TableParquetOptions;
-    use datafusion::dataframe::DataFrameWriteOptions;
-    use datafusion::datasource::listing::ListingTableUrl;
-    use datafusion::prelude::ParquetReadOptions;
-    use tempfile::TempDir;
-
-    #[tokio::test]
-    async fn test_standalone_mode() {
-        use super::*;
-        let context = BallistaContext::standalone(&BallistaConfig::default(), 
1)
-            .await
-            .unwrap();
-        let df = context.sql("SELECT 1;").await.unwrap();
-        df.collect().await.unwrap();
-    }
-
-    #[tokio::test]
-    async fn test_write_parquet() -> Result<()> {
-        use super::*;
-        let context = BallistaContext::standalone(&BallistaConfig::default(), 
1).await?;
-        let df = context.sql("SELECT 1;").await?;
-        let tmp_dir = TempDir::new().unwrap();
-        let file_path = format!(
-            "{}",
-            tmp_dir.path().join("test_write_parquet.parquet").display()
-        );
-        df.write_parquet(
-            &file_path,
-            DataFrameWriteOptions::default(),
-            Some(TableParquetOptions::default()),
-        )
-        .await?;
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_write_csv() -> Result<()> {
-        use super::*;
-        let context = BallistaContext::standalone(&BallistaConfig::default(), 
1).await?;
-        let df = context.sql("SELECT 1;").await?;
-        let tmp_dir = TempDir::new().unwrap();
-        let file_path =
-            format!("{}", tmp_dir.path().join("test_write_csv.csv").display());
-        df.write_csv(&file_path, DataFrameWriteOptions::default(), None)
-            .await?;
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_ballista_show_tables() {
-        use super::*;
-        use std::fs::File;
-        use std::io::Write;
-        use tempfile::TempDir;
-        let context = BallistaContext::standalone(&BallistaConfig::default(), 
1)
-            .await
-            .unwrap();
-
-        let data = "Jorge,2018-12-13T12:12:10.011Z\n\
-                    Andrew,2018-11-13T17:11:10.011Z";
-
-        let tmp_dir = TempDir::new().unwrap();
-        let file_path = tmp_dir.path().join("timestamps.csv");
-
-        // scope to ensure the file is closed and written
-        {
-            File::create(&file_path)
-                .expect("creating temp file")
-                .write_all(data.as_bytes())
-                .expect("writing data");
-        }
-
-        let sql = format!(
-            "CREATE EXTERNAL TABLE csv_with_timestamps (
-                  name VARCHAR,
-                  ts TIMESTAMP
-              )
-              STORED AS CSV
-              LOCATION '{}'
-              OPTIONS ('has_header' 'false', 'delimiter' ',')
-              ",
-            file_path.to_str().expect("path is utf8")
-        );
-
-        context.sql(sql.as_str()).await.unwrap();
-
-        let df = context.sql("show columns from csv_with_timestamps;").await;
-
-        assert!(df.is_err());
-    }
-
-    #[tokio::test]
-    #[ignore = "this one fails after config change (will be removed)"]
-    async fn test_show_tables_not_with_information_schema() {
-        use super::*;
-
-        use std::fs::File;
-        use std::io::Write;
-        use tempfile::TempDir;
-        let config = BallistaConfig::default();
-        let context = BallistaContext::standalone(&config, 1).await.unwrap();
-
-        let data = "Jorge,2018-12-13T12:12:10.011Z\n\
-                    Andrew,2018-11-13T17:11:10.011Z";
-
-        let tmp_dir = TempDir::new().unwrap();
-        let file_path = tmp_dir.path().join("timestamps.csv");
-
-        // scope to ensure the file is closed and written
-        {
-            File::create(&file_path)
-                .expect("creating temp file")
-                .write_all(data.as_bytes())
-                .expect("writing data");
-        }
-
-        let sql = format!(
-            "CREATE EXTERNAL TABLE csv_with_timestamps (
-                  name VARCHAR,
-                  ts TIMESTAMP
-              )
-              STORED AS CSV
-              LOCATION '{}'
-              ",
-            file_path.to_str().expect("path is utf8")
-        );
-
-        context.sql(sql.as_str()).await.unwrap();
-        let df = context.sql("show tables;").await;
-        assert!(df.is_ok());
-    }
-
-    #[tokio::test]
-    #[ignore]
-    // Tracking: https://github.com/apache/arrow-datafusion/issues/1840
-    async fn test_task_stuck_when_referenced_task_failed() {
-        use super::*;
-        use datafusion::arrow::datatypes::Schema;
-        use datafusion::arrow::util::pretty;
-        use datafusion::datasource::file_format::csv::CsvFormat;
-        use datafusion::datasource::listing::{
-            ListingOptions, ListingTable, ListingTableConfig,
-        };
-
-        let config = BallistaConfig::default();
-        let context = BallistaContext::standalone(&config, 1).await.unwrap();
-
-        context
-            .register_parquet(
-                "single_nan",
-                "testdata/single_nan.parquet",
-                ParquetReadOptions::default(),
-            )
-            .await
-            .unwrap();
-
-        {
-            let mut guard = context.state.lock();
-            let csv_table = guard.tables.get("single_nan");
-
-            if let Some(table_provide) = csv_table {
-                if let Some(listing_table) = table_provide
-                    .clone()
-                    .as_any()
-                    .downcast_ref::<ListingTable>()
-                {
-                    let x = listing_table.options();
-                    let error_options = ListingOptions {
-                        file_extension: x.file_extension.clone(),
-                        format: Arc::new(CsvFormat::default()),
-                        table_partition_cols: x.table_partition_cols.clone(),
-                        collect_stat: x.collect_stat,
-                        target_partitions: x.target_partitions,
-                        file_sort_order: vec![],
-                    };
-
-                    let table_paths = listing_table
-                        .table_paths()
-                        .iter()
-                        .map(|t| ListingTableUrl::parse(t).unwrap())
-                        .collect();
-                    let config = 
ListingTableConfig::new_with_multi_paths(table_paths)
-                        .with_schema(Arc::new(Schema::empty()))
-                        .with_listing_options(error_options);
-
-                    let error_table = ListingTable::try_new(config).unwrap();
-
-                    // change the table to an error table
-                    guard
-                        .tables
-                        .insert("single_nan".to_string(), 
Arc::new(error_table));
-                }
-            }
-        }
-
-        let df = context
-            .sql("select count(1) from single_nan;")
-            .await
-            .unwrap();
-        let results = df.collect().await.unwrap();
-        pretty::print_batches(&results).unwrap();
-    }
-
-    #[tokio::test]
-    async fn test_empty_exec_with_one_row() {
-        use crate::context::BallistaContext;
-
-        let config = BallistaConfig::default();
-        let context = BallistaContext::standalone(&config, 1).await.unwrap();
-
-        let sql = "select EXTRACT(year FROM 
to_timestamp('2020-09-08T12:13:14+00:00'));";
-
-        let df = context.sql(sql).await.unwrap();
-        assert!(!df.collect().await.unwrap().is_empty());
-    }
-
-    #[tokio::test]
-    async fn test_union_and_union_all() {
-        use super::*;
-        use datafusion::arrow::util::pretty::pretty_format_batches;
-        let config = BallistaConfig::default();
-        let context = BallistaContext::standalone(&config, 1).await.unwrap();
-
-        let df = context
-            .sql("SELECT 1 as NUMBER union SELECT 1 as NUMBER;")
-            .await
-            .unwrap();
-        let res1 = df.collect().await.unwrap();
-        let expected1 = vec![
-            "+--------+",
-            "| number |",
-            "+--------+",
-            "| 1      |",
-            "+--------+",
-        ];
-        assert_eq!(
-            expected1,
-            pretty_format_batches(&res1)
-                .unwrap()
-                .to_string()
-                .trim()
-                .lines()
-                .collect::<Vec<&str>>()
-        );
-        let expected2 = vec![
-            "+--------+",
-            "| number |",
-            "+--------+",
-            "| 1      |",
-            "| 1      |",
-            "+--------+",
-        ];
-        let df = context
-            .sql("SELECT 1 as NUMBER union all SELECT 1 as NUMBER;")
-            .await
-            .unwrap();
-        let res2 = df.collect().await.unwrap();
-        assert_eq!(
-            expected2,
-            pretty_format_batches(&res2)
-                .unwrap()
-                .to_string()
-                .trim()
-                .lines()
-                .collect::<Vec<&str>>()
-        );
-    }
-
-    #[tokio::test]
-    async fn test_aggregate_min_max() {
-        let context = create_test_context().await;
-
-        let df = context.sql("select min(\"id\") from test").await.unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------+",
-            "| min(test.id) |",
-            "+--------------+",
-            "| 0            |",
-            "+--------------+",
-        ];
-        assert_result_eq(expected, &res);
-
-        let df = context.sql("select max(\"id\") from test").await.unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------+",
-            "| max(test.id) |",
-            "+--------------+",
-            "| 7            |",
-            "+--------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-
-    #[tokio::test]
-    async fn test_aggregate_sum() {
-        let context = create_test_context().await;
-
-        let df = context.sql("select SUM(\"id\") from test").await.unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------+",
-            "| sum(test.id) |",
-            "+--------------+",
-            "| 28           |",
-            "+--------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_avg() {
-        let context = create_test_context().await;
-
-        let df = context.sql("select AVG(\"id\") from test").await.unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------+",
-            "| avg(test.id) |",
-            "+--------------+",
-            "| 3.5          |",
-            "+--------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-
-    #[tokio::test]
-    async fn test_aggregate_count() {
-        let context = create_test_context().await;
-
-        let df = context.sql("select COUNT(\"id\") from test").await.unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+----------------+",
-            "| count(test.id) |",
-            "+----------------+",
-            "| 8              |",
-            "+----------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_approx_distinct() {
-        let context = create_test_context().await;
-
-        let df = context
-            .sql("select approx_distinct(\"id\") from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------------------+",
-            "| approx_distinct(test.id) |",
-            "+--------------------------+",
-            "| 8                        |",
-            "+--------------------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_array_agg() {
-        let context = create_test_context().await;
-
-        let df = context
-            .sql("select ARRAY_AGG(\"id\") from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------------------+",
-            "| array_agg(test.id)       |",
-            "+--------------------------+",
-            "| [4, 5, 6, 7, 2, 3, 0, 1] |",
-            "+--------------------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_var() {
-        let context = create_test_context().await;
-
-        let df = context.sql("select VAR(\"id\") from test").await.unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+-------------------+",
-            "| var(test.id)      |",
-            "+-------------------+",
-            "| 6.000000000000001 |",
-            "+-------------------+",
-        ];
-        assert_result_eq(expected, &res);
-
-        let df = context
-            .sql("select VAR_POP(\"id\") from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+-------------------+",
-            "| var_pop(test.id)  |",
-            "+-------------------+",
-            "| 5.250000000000001 |",
-            "+-------------------+",
-        ];
-        assert_result_eq(expected, &res);
-
-        let df = context
-            .sql("select VAR_SAMP(\"id\") from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+-------------------+",
-            "| var(test.id)      |",
-            "+-------------------+",
-            "| 6.000000000000001 |",
-            "+-------------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_stddev() {
-        let context = create_test_context().await;
-
-        let df = context
-            .sql("select STDDEV(\"id\") from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------------+",
-            "| stddev(test.id)    |",
-            "+--------------------+",
-            "| 2.4494897427831783 |",
-            "+--------------------+",
-        ];
-        assert_result_eq(expected, &res);
-
-        let df = context
-            .sql("select STDDEV_SAMP(\"id\") from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------------+",
-            "| stddev(test.id)    |",
-            "+--------------------+",
-            "| 2.4494897427831783 |",
-            "+--------------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_covar() {
-        let context = create_test_context().await;
-
-        let df = context
-            .sql("select COVAR(id, tinyint_col) from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------------------------------+",
-            "| covar_samp(test.id,test.tinyint_col) |",
-            "+--------------------------------------+",
-            "| 0.28571428571428586                  |",
-            "+--------------------------------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    #[tokio::test]
-    async fn test_aggregate_correlation() {
-        let context = create_test_context().await;
-
-        let df = context
-            .sql("select CORR(id, tinyint_col) from test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+--------------------------------+",
-            "| corr(test.id,test.tinyint_col) |",
-            "+--------------------------------+",
-            "| 0.21821789023599245            |",
-            "+--------------------------------+",
-        ];
-        assert_result_eq(expected, &res);
-    }
-    // enable when upgrading Datafusion to > 42
-    #[ignore]
-    #[tokio::test]
-    async fn test_aggregate_approx_percentile() {
-        let context = create_test_context().await;
-
-        let df = context
-            .sql("select approx_percentile_cont_with_weight(id, 2, 0.5) from 
test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            
"+-------------------------------------------------------------------+",
-            "| 
approx_percentile_cont_with_weight(test.id,Int64(2),Float64(0.5)) |",
-            
"+-------------------------------------------------------------------+",
-            "| 1                                                               
  |",
-            
"+-------------------------------------------------------------------+",
-        ];
-        assert_result_eq(expected, &res);
-
-        let df = context
-            .sql("select approx_percentile_cont(\"double_col\", 0.5) from 
test")
-            .await
-            .unwrap();
-        let res = df.collect().await.unwrap();
-        let expected = vec![
-            "+------------------------------------------------------+",
-            "| approx_percentile_cont(test.double_col,Float64(0.5)) |",
-            "+------------------------------------------------------+",
-            "| 7.574999999999999                                    |",
-            "+------------------------------------------------------+",
-        ];
-
-        assert_result_eq(expected, &res);
-    }
-
-    fn assert_result_eq(
-        expected: Vec<&str>,
-        results: &[arrow::record_batch::RecordBatch],
-    ) {
-        assert_eq!(
-            expected,
-            pretty_format_batches(results)
-                .unwrap()
-                .to_string()
-                .trim()
-                .lines()
-                .collect::<Vec<&str>>()
-        );
-    }
-    async fn create_test_context() -> BallistaContext {
-        let config = BallistaConfig::default();
-        let context = BallistaContext::standalone(&config, 4).await.unwrap();
-
-        context
-            .register_parquet(
-                "test",
-                "testdata/alltypes_plain.parquet",
-                ParquetReadOptions::default(),
-            )
-            .await
-            .unwrap();
-        context
-    }
-}
diff --git a/ballista/client/src/lib.rs b/ballista/client/src/lib.rs
index 76bd0c94..1605d405 100644
--- a/ballista/client/src/lib.rs
+++ b/ballista/client/src/lib.rs
@@ -17,6 +17,5 @@
 
 #![doc = include_str!("../README.md")]
 
-pub mod context;
 pub mod extension;
 pub mod prelude;
diff --git a/ballista/client/src/prelude.rs b/ballista/client/src/prelude.rs
index 1410476b..ca6860d9 100644
--- a/ballista/client/src/prelude.rs
+++ b/ballista/client/src/prelude.rs
@@ -17,12 +17,10 @@
 
 //! Ballista Prelude (common imports)
 
-pub use ballista_core::{
-    config::BallistaConfig,
-    error::{BallistaError, Result},
-};
+// pub use ballista_core::{
+//     config::BallistaConfig,
+//     error::{BallistaError, Result},
+// };
 
-#[allow(deprecated)] // TO BE REMOVED
-pub use crate::context::BallistaContext;
 pub use crate::extension::{SessionConfigExt, SessionContextExt};
-pub use futures::StreamExt;
+//pub use futures::StreamExt;
diff --git a/ballista/client/tests/context_standalone.rs 
b/ballista/client/tests/context_standalone.rs
index bd83d527..c17b53e5 100644
--- a/ballista/client/tests/context_standalone.rs
+++ b/ballista/client/tests/context_standalone.rs
@@ -24,10 +24,10 @@ mod common;
 #[cfg(test)]
 #[cfg(feature = "standalone")]
 mod standalone_tests {
-    use ballista::extension::SessionContextExt;
-    use ballista_core::error::Result;
+    use ballista::prelude::SessionContextExt;
     use datafusion::arrow;
     use datafusion::arrow::util::pretty::pretty_format_batches;
+    use datafusion::common::Result;
     use datafusion::config::TableParquetOptions;
     use datafusion::dataframe::DataFrameWriteOptions;
     use datafusion::prelude::ParquetReadOptions;
diff --git a/ballista/client/tests/remote.rs b/ballista/client/tests/remote.rs
index b0184b26..c03db852 100644
--- a/ballista/client/tests/remote.rs
+++ b/ballista/client/tests/remote.rs
@@ -19,7 +19,7 @@ mod common;
 
 #[cfg(test)]
 mod remote {
-    use ballista::extension::SessionContextExt;
+    use ballista::prelude::SessionContextExt;
     use datafusion::{assert_batches_eq, prelude::SessionContext};
 
     #[tokio::test]
diff --git a/ballista/client/tests/setup.rs b/ballista/client/tests/setup.rs
index d1e487b5..4dc6050d 100644
--- a/ballista/client/tests/setup.rs
+++ b/ballista/client/tests/setup.rs
@@ -19,7 +19,7 @@ mod common;
 
 #[cfg(test)]
 mod remote {
-    use ballista::extension::{SessionConfigExt, SessionContextExt};
+    use ballista::prelude::{SessionConfigExt, SessionContextExt};
     use datafusion::{
         assert_batches_eq,
         execution::SessionStateBuilder,
diff --git a/ballista/client/tests/standalone.rs 
b/ballista/client/tests/standalone.rs
index c5f519b2..fe9f3df1 100644
--- a/ballista/client/tests/standalone.rs
+++ b/ballista/client/tests/standalone.rs
@@ -20,7 +20,8 @@ mod common;
 #[cfg(test)]
 #[cfg(feature = "standalone")]
 mod standalone {
-    use ballista::{extension::SessionContextExt, prelude::*};
+    use ballista::prelude::SessionContextExt;
+    use ballista_core::config::BallistaConfig;
     use datafusion::prelude::*;
     use datafusion::{assert_batches_eq, prelude::SessionContext};
 
diff --git a/examples/examples/remote-dataframe.rs 
b/examples/examples/remote-dataframe.rs
index 74ae5f09..53fb4adf 100644
--- a/examples/examples/remote-dataframe.rs
+++ b/examples/examples/remote-dataframe.rs
@@ -18,6 +18,7 @@
 use ballista::prelude::*;
 use ballista_examples::test_util;
 use datafusion::{
+    common::Result,
     execution::SessionStateBuilder,
     prelude::{col, lit, ParquetReadOptions, SessionConfig, SessionContext},
 };
diff --git a/examples/examples/remote-sql.rs b/examples/examples/remote-sql.rs
index 673b2dd6..566abb5d 100644
--- a/examples/examples/remote-sql.rs
+++ b/examples/examples/remote-sql.rs
@@ -18,6 +18,7 @@
 use ballista::prelude::*;
 use ballista_examples::test_util;
 use datafusion::{
+    common::Result,
     execution::SessionStateBuilder,
     prelude::{CsvReadOptions, SessionConfig, SessionContext},
 };
diff --git a/examples/examples/standalone-sql.rs 
b/examples/examples/standalone-sql.rs
index 9a03bd9b..da711d30 100644
--- a/examples/examples/standalone-sql.rs
+++ b/examples/examples/standalone-sql.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use ballista::prelude::{Result, SessionConfigExt, SessionContextExt};
+use ballista::prelude::{SessionConfigExt, SessionContextExt};
 use ballista_examples::test_util;
 use datafusion::{
+    common::Result,
     execution::{options::ParquetReadOptions, SessionStateBuilder},
     prelude::{SessionConfig, SessionContext},
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to