This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b59f4af5bf Add SessionContext::register_object_store (#10621)
b59f4af5bf is described below
commit b59f4af5bf6d6e41e1fb19e5282a122bed797d85
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu May 23 11:19:52 2024 -0400
Add SessionContext::register_object_store (#10621)
---
datafusion-cli/src/exec.rs | 2 +-
.../external_dependency/dataframe-to-s3.rs | 3 +--
.../examples/external_dependency/query-aws-s3.rs | 3 +--
.../examples/parquet_sql_multiple_files.rs | 2 +-
datafusion-examples/examples/query-http-csv.rs | 3 +--
.../core/src/datasource/physical_plan/avro.rs | 4 +---
.../core/src/datasource/physical_plan/csv.rs | 10 ++++-----
.../core/src/datasource/physical_plan/json.rs | 6 +++---
.../src/datasource/physical_plan/parquet/mod.rs | 4 ++--
datafusion/core/src/execution/context/mod.rs | 24 ++++++++++++++++++++++
datafusion/core/src/test/object_store.rs | 3 +--
datafusion/core/tests/dataframe/mod.rs | 4 ++--
datafusion/core/tests/path_partition.rs | 4 ++--
13 files changed, 44 insertions(+), 28 deletions(-)
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index cfbc97ecbe..ffe447e79f 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -379,7 +379,7 @@ pub(crate) async fn
register_object_store_and_config_extensions(
let store = get_object_store(&ctx.state(), scheme, url,
&table_options).await?;
// Register the retrieved object store in the session context's runtime
environment
- ctx.runtime_env().register_object_store(url, store);
+ ctx.register_object_store(url, store);
Ok(())
}
diff --git
a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
index 8d56c440da..4d71ed7589 100644
--- a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
+++ b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
@@ -49,8 +49,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
let arc_s3 = Arc::new(s3);
- ctx.runtime_env()
- .register_object_store(&s3_url, arc_s3.clone());
+ ctx.register_object_store(&s3_url, arc_s3.clone());
let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
diff --git a/datafusion-examples/examples/external_dependency/query-aws-s3.rs
b/datafusion-examples/examples/external_dependency/query-aws-s3.rs
index cbb6486b4e..e32286e30e 100644
--- a/datafusion-examples/examples/external_dependency/query-aws-s3.rs
+++ b/datafusion-examples/examples/external_dependency/query-aws-s3.rs
@@ -48,8 +48,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}");
let s3_url = Url::parse(&path).unwrap();
- ctx.runtime_env()
- .register_object_store(&s3_url, Arc::new(s3));
+ ctx.register_object_store(&s3_url, Arc::new(s3));
// cannot query the parquet files from this bucket because the path
contains a whitespace
// and we don't support that yet
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs
b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 30ca1df73d..b0d3922a32 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let local_fs = Arc::new(LocalFileSystem::default());
let u = url::Url::parse("file://./")?;
- ctx.runtime_env().register_object_store(&u, local_fs);
+ ctx.register_object_store(&u, local_fs);
// Register a listing table - this will use all files in the directory as
data sources
// for the query
diff --git a/datafusion-examples/examples/query-http-csv.rs
b/datafusion-examples/examples/query-http-csv.rs
index 928d702711..fa3fd2ac06 100644
--- a/datafusion-examples/examples/query-http-csv.rs
+++ b/datafusion-examples/examples/query-http-csv.rs
@@ -34,8 +34,7 @@ async fn main() -> Result<()> {
.with_url(base_url.clone())
.build()
.unwrap();
- ctx.runtime_env()
- .register_object_store(&base_url, Arc::new(http_store));
+ ctx.register_object_store(&base_url, Arc::new(http_store));
// register csv file with the execution context
ctx.register_csv(
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 4e5140e82d..a8a29e9bba 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -261,9 +261,7 @@ mod tests {
let state = session_ctx.state();
let url = Url::parse("file://").unwrap();
- state
- .runtime_env()
- .register_object_store(&url, store.clone());
+ session_ctx.register_object_store(&url, store.clone());
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{testdata}/avro/alltypes_plain.avro");
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index cc7c837e47..a266b9b014 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -895,7 +895,7 @@ mod tests {
) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
- ctx.runtime_env().register_object_store(&url, store.clone());
+ ctx.register_object_store(&url, store.clone());
let task_ctx = ctx.task_ctx();
@@ -968,9 +968,7 @@ mod tests {
store.put(&path, data).await.unwrap();
let url = Url::parse("memory://").unwrap();
- session_ctx
- .runtime_env()
- .register_object_store(&url, Arc::new(store));
+ session_ctx.register_object_store(&url, Arc::new(store));
let df = session_ctx
.read_csv("memory:///", CsvReadOptions::new())
@@ -999,7 +997,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
let options = CsvReadOptions::default()
.schema_infer_max_records(2)
.has_header(true);
@@ -1039,7 +1037,7 @@ mod tests {
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 0180caa850..4728069f19 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -446,7 +446,7 @@ mod tests {
) -> Result<()> {
let ctx = SessionContext::new();
let url = Url::parse("file://").unwrap();
- ctx.runtime_env().register_object_store(&url, store.clone());
+ ctx.register_object_store(&url, store.clone());
let filename = "1.json";
let tmp_dir = TempDir::new()?;
let file_groups = partitioned_file_groups(
@@ -752,7 +752,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
@@ -845,7 +845,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
let options = CsvReadOptions::default()
.schema_infer_max_records(2)
.has_header(true);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 902a284c23..410413ebd7 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -996,7 +996,7 @@ mod tests {
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
let options = CsvReadOptions::default()
.schema_infer_max_records(2)
@@ -2052,7 +2052,7 @@ mod tests {
// register a local file system object store for /tmp directory
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 59238ffc55..d23982cfa9 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -91,6 +91,7 @@ use sqlparser::dialect::dialect_from_str;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
+use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;
use uuid::Uuid;
@@ -354,6 +355,29 @@ impl SessionContext {
self
}
+ /// Registers an [`ObjectStore`] to be used with a specific URL prefix.
+ ///
+ /// See [`RuntimeEnv::register_object_store`] for more details.
+ ///
+ /// # Example: register a local object store for the "file://" URL prefix
+ /// ```
+ /// # use std::sync::Arc;
+ /// # use datafusion::prelude::SessionContext;
+ /// # use datafusion_execution::object_store::ObjectStoreUrl;
+ /// let object_store_url = ObjectStoreUrl::parse("file://").unwrap();
+ /// let object_store = object_store::local::LocalFileSystem::new();
+ /// let mut ctx = SessionContext::new();
+ /// // All files with the file:// url prefix will be read from the local
file system
+ /// ctx.register_object_store(object_store_url.as_ref(),
Arc::new(object_store));
+ /// ```
+ pub fn register_object_store(
+ &self,
+ url: &Url,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ self.runtime_env().register_object_store(url, object_store)
+ }
+
/// Registers the [`RecordBatch`] as the specified table name
pub fn register_batch(
&self,
diff --git a/datafusion/core/src/test/object_store.rs
b/datafusion/core/src/test/object_store.rs
index d6f324a7f1..bea6f7b9ce 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -27,8 +27,7 @@ use url::Url;
/// Returns a test object store with the provided `ctx`
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
let url = Url::parse("test://").unwrap();
- ctx.runtime_env()
- .register_object_store(&url, make_test_store_and_state(files).0);
+ ctx.register_object_store(&url, make_test_store_and_state(files).0);
}
/// Create a test object store with the provided files
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index 9b7cb85614..60e60bb1e3 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -2070,7 +2070,7 @@ async fn write_partitioned_parquet_results() ->
Result<()> {
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
@@ -2140,7 +2140,7 @@ async fn write_parquet_results() -> Result<()> {
// register a local file system object store for /tmp directory
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
- ctx.runtime_env().register_object_store(&local_url, local);
+ ctx.register_object_store(&local_url, local);
// execute a simple query and write the results to parquet
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
diff --git a/datafusion/core/tests/path_partition.rs
b/datafusion/core/tests/path_partition.rs
index ffe0494dae..ce71c89069 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -533,7 +533,7 @@ fn register_partitioned_aggregate_csv(
let csv_file_path = format!("{testdata}/csv/aggregate_test_100.csv");
let file_schema = test_util::aggr_test_schema();
let url = Url::parse("mirror://").unwrap();
- ctx.runtime_env().register_object_store(
+ ctx.register_object_store(
&url,
MirroringObjectStore::new_arc(csv_file_path, store_paths),
);
@@ -566,7 +566,7 @@ async fn register_partitioned_alltypes_parquet(
let testdata = parquet_test_data();
let parquet_file_path = format!("{testdata}/{source_file}");
let url = Url::parse("mirror://").unwrap();
- ctx.runtime_env().register_object_store(
+ ctx.register_object_store(
&url,
MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]