This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b29c02c Add support for S3 data sources (#290)
1b29c02c is described below

commit 1b29c02c59f670868f1146a485d699ea4a221ffd
Author: Andy Grove <[email protected]>
AuthorDate: Wed Sep 28 22:50:27 2022 -0600

    Add support for S3 data sources (#290)
---
 ballista-cli/Cargo.toml         |  3 +++
 ballista-cli/src/main.rs        | 14 +++++++-------
 ballista/rust/client/Cargo.toml |  1 +
 ballista/rust/core/Cargo.toml   |  1 +
 ballista/rust/core/src/utils.rs | 14 ++++++++++++++
 5 files changed, 26 insertions(+), 7 deletions(-)

diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index b99c924c..3ad38a20 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -38,3 +38,6 @@ env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
 rustyline = "10.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", 
"sync", "parking_lot"] }
+
+[features]
+s3 = ["ballista/s3"]
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 69868091..11f42166 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -98,15 +98,15 @@ pub async fn main() -> Result<()> {
         env::set_current_dir(&p).unwrap();
     };
 
-    let mut session_config = 
SessionConfig::new().with_information_schema(true);
-
-    if let Some(batch_size) = args.batch_size {
-        session_config = session_config.with_batch_size(batch_size);
-    };
-
     let mut ctx: Context = match (args.host, args.port) {
         (Some(ref h), Some(p)) => Context::new_remote(h, p).await?,
-        _ => Context::new_local(&session_config),
+        _ => {
+            let mut session_config = 
SessionConfig::new().with_information_schema(true);
+            if let Some(batch_size) = args.batch_size {
+                session_config = session_config.with_batch_size(batch_size);
+            };
+            Context::new_local(&session_config)
+        }
     };
 
     let mut print_options = PrintOptions {
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 6850ab38..a7ab2164 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -43,4 +43,5 @@ tokio = "1.0"
 [features]
 default = []
 hdfs = ["ballista-core/hdfs"]
+s3 = ["ballista-core/s3"]
 standalone = ["ballista-executor", "ballista-scheduler"]
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index c234dfa3..6699af01 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,6 +35,7 @@ rustc-args = ["--cfg", "docsrs"]
 force_hash_collisions = ["datafusion/force_hash_collisions"]
 # Used to enable hdfs to be registered in the ObjectStoreRegistry by default
 hdfs = ["datafusion-objectstore-hdfs"]
+s3 = ["object_store/aws"]
 simd = ["datafusion/simd"]
 
 [dependencies]
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 84c3664f..20f8b927 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -49,6 +49,8 @@ use datafusion_proto::logical_plan::{
     AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
 };
 use futures::StreamExt;
+#[cfg(feature = "s3")]
+use object_store::aws::AmazonS3Builder;
 use object_store::ObjectStore;
 use std::io::{BufWriter, Write};
 use std::marker::PhantomData;
@@ -94,6 +96,18 @@ impl ObjectStoreProvider for FeatureBasedObjectStoreProvider 
{
             }
         }
 
+        #[cfg(feature = "s3")]
+        {
+            if url.to_string().starts_with("s3://") {
+                if let Some(bucket_name) = url.host_str() {
+                    let store = AmazonS3Builder::from_env()
+                        .with_bucket_name(bucket_name)
+                        .build()?;
+                    return Ok(Arc::new(store));
+                }
+            }
+        }
+
         Err(DataFusionError::Execution(format!(
             "No object store available for {}",
             url

Reply via email to