This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/master by this push:
new 1b29c02c Add support for S3 data sources (#290)
1b29c02c is described below
commit 1b29c02c59f670868f1146a485d699ea4a221ffd
Author: Andy Grove <[email protected]>
AuthorDate: Wed Sep 28 22:50:27 2022 -0600
Add support for S3 data sources (#290)
---
ballista-cli/Cargo.toml | 3 +++
ballista-cli/src/main.rs | 14 +++++++-------
ballista/rust/client/Cargo.toml | 1 +
ballista/rust/core/Cargo.toml | 1 +
ballista/rust/core/src/utils.rs | 14 ++++++++++++++
5 files changed, 26 insertions(+), 7 deletions(-)
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index b99c924c..3ad38a20 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -38,3 +38,6 @@ env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
rustyline = "10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread",
"sync", "parking_lot"] }
+
+[features]
+s3 = ["ballista/s3"]
diff --git a/ballista-cli/src/main.rs b/ballista-cli/src/main.rs
index 69868091..11f42166 100644
--- a/ballista-cli/src/main.rs
+++ b/ballista-cli/src/main.rs
@@ -98,15 +98,15 @@ pub async fn main() -> Result<()> {
env::set_current_dir(&p).unwrap();
};
- let mut session_config =
SessionConfig::new().with_information_schema(true);
-
- if let Some(batch_size) = args.batch_size {
- session_config = session_config.with_batch_size(batch_size);
- };
-
let mut ctx: Context = match (args.host, args.port) {
(Some(ref h), Some(p)) => Context::new_remote(h, p).await?,
- _ => Context::new_local(&session_config),
+ _ => {
+ let mut session_config =
SessionConfig::new().with_information_schema(true);
+ if let Some(batch_size) = args.batch_size {
+ session_config = session_config.with_batch_size(batch_size);
+ };
+ Context::new_local(&session_config)
+ }
};
let mut print_options = PrintOptions {
diff --git a/ballista/rust/client/Cargo.toml b/ballista/rust/client/Cargo.toml
index 6850ab38..a7ab2164 100644
--- a/ballista/rust/client/Cargo.toml
+++ b/ballista/rust/client/Cargo.toml
@@ -43,4 +43,5 @@ tokio = "1.0"
[features]
default = []
hdfs = ["ballista-core/hdfs"]
+s3 = ["ballista-core/s3"]
standalone = ["ballista-executor", "ballista-scheduler"]
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index c234dfa3..6699af01 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -35,6 +35,7 @@ rustc-args = ["--cfg", "docsrs"]
force_hash_collisions = ["datafusion/force_hash_collisions"]
# Used to enable hdfs to be registered in the ObjectStoreRegistry by default
hdfs = ["datafusion-objectstore-hdfs"]
+s3 = ["object_store/aws"]
simd = ["datafusion/simd"]
[dependencies]
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 84c3664f..20f8b927 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -49,6 +49,8 @@ use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use futures::StreamExt;
+#[cfg(feature = "s3")]
+use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
@@ -94,6 +96,18 @@ impl ObjectStoreProvider for FeatureBasedObjectStoreProvider
{
}
}
+ #[cfg(feature = "s3")]
+ {
+ if url.to_string().starts_with("s3://") {
+ if let Some(bucket_name) = url.host_str() {
+ let store = AmazonS3Builder::from_env()
+ .with_bucket_name(bucket_name)
+ .build()?;
+ return Ok(Arc::new(store));
+ }
+ }
+ }
+
Err(DataFusionError::Execution(format!(
"No object store available for {}",
url