This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b59f4af5bf Add SessionContext::register_object_store (#10621)
b59f4af5bf is described below

commit b59f4af5bf6d6e41e1fb19e5282a122bed797d85
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu May 23 11:19:52 2024 -0400

    Add SessionContext::register_object_store (#10621)
---
 datafusion-cli/src/exec.rs                         |  2 +-
 .../external_dependency/dataframe-to-s3.rs         |  3 +--
 .../examples/external_dependency/query-aws-s3.rs   |  3 +--
 .../examples/parquet_sql_multiple_files.rs         |  2 +-
 datafusion-examples/examples/query-http-csv.rs     |  3 +--
 .../core/src/datasource/physical_plan/avro.rs      |  4 +---
 .../core/src/datasource/physical_plan/csv.rs       | 10 ++++-----
 .../core/src/datasource/physical_plan/json.rs      |  6 +++---
 .../src/datasource/physical_plan/parquet/mod.rs    |  4 ++--
 datafusion/core/src/execution/context/mod.rs       | 24 ++++++++++++++++++++++
 datafusion/core/src/test/object_store.rs           |  3 +--
 datafusion/core/tests/dataframe/mod.rs             |  4 ++--
 datafusion/core/tests/path_partition.rs            |  4 ++--
 13 files changed, 44 insertions(+), 28 deletions(-)

diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index cfbc97ecbe..ffe447e79f 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -379,7 +379,7 @@ pub(crate) async fn 
register_object_store_and_config_extensions(
     let store = get_object_store(&ctx.state(), scheme, url, 
&table_options).await?;
 
     // Register the retrieved object store in the session context's runtime 
environment
-    ctx.runtime_env().register_object_store(url, store);
+    ctx.register_object_store(url, store);
 
     Ok(())
 }
diff --git 
a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs 
b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
index 8d56c440da..4d71ed7589 100644
--- a/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
+++ b/datafusion-examples/examples/external_dependency/dataframe-to-s3.rs
@@ -49,8 +49,7 @@ async fn main() -> Result<()> {
     let path = format!("s3://{bucket_name}");
     let s3_url = Url::parse(&path).unwrap();
     let arc_s3 = Arc::new(s3);
-    ctx.runtime_env()
-        .register_object_store(&s3_url, arc_s3.clone());
+    ctx.register_object_store(&s3_url, arc_s3.clone());
 
     let path = format!("s3://{bucket_name}/test_data/");
     let file_format = ParquetFormat::default().with_enable_pruning(true);
diff --git a/datafusion-examples/examples/external_dependency/query-aws-s3.rs 
b/datafusion-examples/examples/external_dependency/query-aws-s3.rs
index cbb6486b4e..e32286e30e 100644
--- a/datafusion-examples/examples/external_dependency/query-aws-s3.rs
+++ b/datafusion-examples/examples/external_dependency/query-aws-s3.rs
@@ -48,8 +48,7 @@ async fn main() -> Result<()> {
 
     let path = format!("s3://{bucket_name}");
     let s3_url = Url::parse(&path).unwrap();
-    ctx.runtime_env()
-        .register_object_store(&s3_url, Arc::new(s3));
+    ctx.register_object_store(&s3_url, Arc::new(s3));
 
     // cannot query the parquet files from this bucket because the path 
contains a whitespace
     // and we don't support that yet
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs 
b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 30ca1df73d..b0d3922a32 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let local_fs = Arc::new(LocalFileSystem::default());
 
     let u = url::Url::parse("file://./")?;
-    ctx.runtime_env().register_object_store(&u, local_fs);
+    ctx.register_object_store(&u, local_fs);
 
     // Register a listing table - this will use all files in the directory as 
data sources
     // for the query
diff --git a/datafusion-examples/examples/query-http-csv.rs 
b/datafusion-examples/examples/query-http-csv.rs
index 928d702711..fa3fd2ac06 100644
--- a/datafusion-examples/examples/query-http-csv.rs
+++ b/datafusion-examples/examples/query-http-csv.rs
@@ -34,8 +34,7 @@ async fn main() -> Result<()> {
         .with_url(base_url.clone())
         .build()
         .unwrap();
-    ctx.runtime_env()
-        .register_object_store(&base_url, Arc::new(http_store));
+    ctx.register_object_store(&base_url, Arc::new(http_store));
 
     // register csv file with the execution context
     ctx.register_csv(
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs 
b/datafusion/core/src/datasource/physical_plan/avro.rs
index 4e5140e82d..a8a29e9bba 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -261,9 +261,7 @@ mod tests {
         let state = session_ctx.state();
 
         let url = Url::parse("file://").unwrap();
-        state
-            .runtime_env()
-            .register_object_store(&url, store.clone());
+        session_ctx.register_object_store(&url, store.clone());
 
         let testdata = crate::test_util::arrow_test_data();
         let filename = format!("{testdata}/avro/alltypes_plain.avro");
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index cc7c837e47..a266b9b014 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -895,7 +895,7 @@ mod tests {
     ) -> Result<()> {
         let ctx = SessionContext::new();
         let url = Url::parse("file://").unwrap();
-        ctx.runtime_env().register_object_store(&url, store.clone());
+        ctx.register_object_store(&url, store.clone());
 
         let task_ctx = ctx.task_ctx();
 
@@ -968,9 +968,7 @@ mod tests {
         store.put(&path, data).await.unwrap();
 
         let url = Url::parse("memory://").unwrap();
-        session_ctx
-            .runtime_env()
-            .register_object_store(&url, Arc::new(store));
+        session_ctx.register_object_store(&url, Arc::new(store));
 
         let df = session_ctx
             .read_csv("memory:///", CsvReadOptions::new())
@@ -999,7 +997,7 @@ mod tests {
         let tmp_dir = TempDir::new()?;
         let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
         let local_url = Url::parse("file://local").unwrap();
-        ctx.runtime_env().register_object_store(&local_url, local);
+        ctx.register_object_store(&local_url, local);
         let options = CsvReadOptions::default()
             .schema_infer_max_records(2)
             .has_header(true);
@@ -1039,7 +1037,7 @@ mod tests {
         let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
         let local_url = Url::parse("file://local").unwrap();
 
-        ctx.runtime_env().register_object_store(&local_url, local);
+        ctx.register_object_store(&local_url, local);
 
         // execute a simple query and write the results to CSV
         let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index 0180caa850..4728069f19 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -446,7 +446,7 @@ mod tests {
     ) -> Result<()> {
         let ctx = SessionContext::new();
         let url = Url::parse("file://").unwrap();
-        ctx.runtime_env().register_object_store(&url, store.clone());
+        ctx.register_object_store(&url, store.clone());
         let filename = "1.json";
         let tmp_dir = TempDir::new()?;
         let file_groups = partitioned_file_groups(
@@ -752,7 +752,7 @@ mod tests {
         let tmp_dir = TempDir::new()?;
         let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
         let local_url = Url::parse("file://local").unwrap();
-        ctx.runtime_env().register_object_store(&local_url, local);
+        ctx.register_object_store(&local_url, local);
 
         // execute a simple query and write the results to CSV
         let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
@@ -845,7 +845,7 @@ mod tests {
         let tmp_dir = TempDir::new()?;
         let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
         let local_url = Url::parse("file://local").unwrap();
-        ctx.runtime_env().register_object_store(&local_url, local);
+        ctx.register_object_store(&local_url, local);
         let options = CsvReadOptions::default()
             .schema_infer_max_records(2)
             .has_header(true);
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 902a284c23..410413ebd7 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -996,7 +996,7 @@ mod tests {
         let tmp_dir = TempDir::new()?;
         let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
         let local_url = Url::parse("file://local").unwrap();
-        ctx.runtime_env().register_object_store(&local_url, local);
+        ctx.register_object_store(&local_url, local);
 
         let options = CsvReadOptions::default()
             .schema_infer_max_records(2)
@@ -2052,7 +2052,7 @@ mod tests {
         // register a local file system object store for /tmp directory
         let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
         let local_url = Url::parse("file://local").unwrap();
-        ctx.runtime_env().register_object_store(&local_url, local);
+        ctx.register_object_store(&local_url, local);
 
         // Configure listing options
         let file_format = ParquetFormat::default().with_enable_pruning(true);
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 59238ffc55..d23982cfa9 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -91,6 +91,7 @@ use sqlparser::dialect::dialect_from_str;
 
 use async_trait::async_trait;
 use chrono::{DateTime, Utc};
+use object_store::ObjectStore;
 use parking_lot::RwLock;
 use url::Url;
 use uuid::Uuid;
@@ -354,6 +355,29 @@ impl SessionContext {
         self
     }
 
+    /// Registers an [`ObjectStore`] to be used with a specific URL prefix.
+    ///
+    /// See [`RuntimeEnv::register_object_store`] for more details.
+    ///
+    /// # Example: register a local object store for the "file://" URL prefix
+    /// ```
+    /// # use std::sync::Arc;
+    /// # use datafusion::prelude::SessionContext;
+    /// # use datafusion_execution::object_store::ObjectStoreUrl;
+    /// let object_store_url = ObjectStoreUrl::parse("file://").unwrap();
+    /// let object_store = object_store::local::LocalFileSystem::new();
+    /// let mut ctx = SessionContext::new();
+    /// // All files with the file:// url prefix will be read from the local 
file system
+    /// ctx.register_object_store(object_store_url.as_ref(), 
Arc::new(object_store));
+    /// ```
+    pub fn register_object_store(
+        &self,
+        url: &Url,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        self.runtime_env().register_object_store(url, object_store)
+    }
+
     /// Registers the [`RecordBatch`] as the specified table name
     pub fn register_batch(
         &self,
diff --git a/datafusion/core/src/test/object_store.rs 
b/datafusion/core/src/test/object_store.rs
index d6f324a7f1..bea6f7b9ce 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -27,8 +27,7 @@ use url::Url;
 /// Returns a test object store with the provided `ctx`
 pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
     let url = Url::parse("test://").unwrap();
-    ctx.runtime_env()
-        .register_object_store(&url, make_test_store_and_state(files).0);
+    ctx.register_object_store(&url, make_test_store_and_state(files).0);
 }
 
 /// Create a test object store with the provided files
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 9b7cb85614..60e60bb1e3 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -2070,7 +2070,7 @@ async fn write_partitioned_parquet_results() -> 
Result<()> {
 
     let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
     let local_url = Url::parse("file://local").unwrap();
-    ctx.runtime_env().register_object_store(&local_url, local);
+    ctx.register_object_store(&local_url, local);
 
     // execute a simple query and write the results to parquet
     let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
@@ -2140,7 +2140,7 @@ async fn write_parquet_results() -> Result<()> {
     // register a local file system object store for /tmp directory
     let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
     let local_url = Url::parse("file://local").unwrap();
-    ctx.runtime_env().register_object_store(&local_url, local);
+    ctx.register_object_store(&local_url, local);
 
     // execute a simple query and write the results to parquet
     let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
diff --git a/datafusion/core/tests/path_partition.rs 
b/datafusion/core/tests/path_partition.rs
index ffe0494dae..ce71c89069 100644
--- a/datafusion/core/tests/path_partition.rs
+++ b/datafusion/core/tests/path_partition.rs
@@ -533,7 +533,7 @@ fn register_partitioned_aggregate_csv(
     let csv_file_path = format!("{testdata}/csv/aggregate_test_100.csv");
     let file_schema = test_util::aggr_test_schema();
     let url = Url::parse("mirror://").unwrap();
-    ctx.runtime_env().register_object_store(
+    ctx.register_object_store(
         &url,
         MirroringObjectStore::new_arc(csv_file_path, store_paths),
     );
@@ -566,7 +566,7 @@ async fn register_partitioned_alltypes_parquet(
     let testdata = parquet_test_data();
     let parquet_file_path = format!("{testdata}/{source_file}");
     let url = Url::parse("mirror://").unwrap();
-    ctx.runtime_env().register_object_store(
+    ctx.register_object_store(
         &url,
         MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths),
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to