This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 73b1f2bd72 Adds Instrument Mode for InstrumentedObjectStore in 
datafusion-cli (#18000)
73b1f2bd72 is described below

commit 73b1f2bd7279bd856e35420beadaf2879c4aa3fc
Author: Blake Orth <[email protected]>
AuthorDate: Thu Oct 9 14:00:56 2025 -0600

    Adds Instrument Mode for InstrumentedObjectStore in datafusion-cli (#18000)
    
    - Adds mode type to allow changing the mode of an
       InstrumentedObjectStore to datafusion-cli
     - Implements string parsing and u8 conversion for
       InstrumentedObjectStoreMode
     - Adds tests to validate trait implementations
---
 datafusion-cli/src/main.rs                        |  11 ++-
 datafusion-cli/src/object_storage/instrumented.rs | 107 ++++++++++++++++++++--
 2 files changed, 104 insertions(+), 14 deletions(-)

diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 39aca4cd13..04a8c2a0f1 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -33,7 +33,9 @@ use datafusion::logical_expr::ExplainFormat;
 use datafusion::prelude::SessionContext;
 use datafusion_cli::catalog::DynamicObjectStoreCatalog;
 use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
-use 
datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry;
+use datafusion_cli::object_storage::instrumented::{
+    InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
+};
 use datafusion_cli::{
     exec,
     pool_type::PoolType,
@@ -208,9 +210,10 @@ async fn main_inner() -> Result<()> {
         rt_builder = rt_builder.with_disk_manager_builder(builder);
     }
 
-    let instrumented_registry = 
Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
-        DefaultObjectStoreRegistry::new(),
-    )));
+    let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(
+        Arc::new(DefaultObjectStoreRegistry::new()),
+        InstrumentedObjectStoreMode::default(),
+    ));
     rt_builder = 
rt_builder.with_object_store_registry(instrumented_registry.clone());
 
     let runtime_env = rt_builder.build_arc()?;
diff --git a/datafusion-cli/src/object_storage/instrumented.rs 
b/datafusion-cli/src/object_storage/instrumented.rs
index c0802faf88..f0313da3a3 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -15,10 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{fmt, sync::Arc};
+use std::{
+    fmt,
+    str::FromStr,
+    sync::{
+        atomic::{AtomicU8, Ordering},
+        Arc,
+    },
+};
 
 use async_trait::async_trait;
-use datafusion::execution::object_store::ObjectStoreRegistry;
+use datafusion::{error::DataFusionError, 
execution::object_store::ObjectStoreRegistry};
 use futures::stream::BoxStream;
 use object_store::{
     path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
@@ -26,25 +33,72 @@ use object_store::{
 };
 use url::Url;
 
+/// The profiling mode to use for an [`ObjectStore`] instance that has been 
instrumented to collect
+/// profiling data. Collecting profiling data will have a small negative 
impact on both CPU and
+/// memory usage. Default is `Disabled`
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
+pub enum InstrumentedObjectStoreMode {
+    /// Disable collection of profiling data
+    #[default]
+    Disabled,
+    /// Enable collection of profiling data
+    Enabled,
+}
+
+impl fmt::Display for InstrumentedObjectStoreMode {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{:?}", self)
+    }
+}
+
+impl FromStr for InstrumentedObjectStoreMode {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        match s.to_lowercase().as_str() {
+            "disabled" => Ok(Self::Disabled),
+            "enabled" => Ok(Self::Enabled),
+            _ => Err(DataFusionError::Execution(format!("Unrecognized mode 
{s}"))),
+        }
+    }
+}
+
+impl From<u8> for InstrumentedObjectStoreMode {
+    fn from(value: u8) -> Self {
+        match value {
+            1 => InstrumentedObjectStoreMode::Enabled,
+            _ => InstrumentedObjectStoreMode::Disabled,
+        }
+    }
+}
+
 /// Wrapped [`ObjectStore`] instances that record information for reporting on 
the usage of the
 /// inner [`ObjectStore`]
 #[derive(Debug)]
 struct InstrumentedObjectStore {
     inner: Arc<dyn ObjectStore>,
+    instrument_mode: AtomicU8,
 }
 
 impl InstrumentedObjectStore {
     /// Returns a new [`InstrumentedObjectStore`] that wraps the provided 
[`ObjectStore`]
-    fn new(object_store: Arc<dyn ObjectStore>) -> Self {
+    fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) -> 
Self {
         Self {
             inner: object_store,
+            instrument_mode,
         }
     }
 }
 
 impl fmt::Display for InstrumentedObjectStore {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "Instrumented Object Store: {}", self.inner)
+        let mode: InstrumentedObjectStoreMode =
+            self.instrument_mode.load(Ordering::Relaxed).into();
+        write!(
+            f,
+            "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
+            self.inner
+        )
     }
 }
 
@@ -100,13 +154,20 @@ impl ObjectStore for InstrumentedObjectStore {
 #[derive(Debug)]
 pub struct InstrumentedObjectStoreRegistry {
     inner: Arc<dyn ObjectStoreRegistry>,
+    instrument_mode: InstrumentedObjectStoreMode,
 }
 
 impl InstrumentedObjectStoreRegistry {
     /// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the 
provided
     /// [`ObjectStoreRegistry`]
-    pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self {
-        Self { inner: registry }
+    pub fn new(
+        registry: Arc<dyn ObjectStoreRegistry>,
+        default_mode: InstrumentedObjectStoreMode,
+    ) -> Self {
+        Self {
+            inner: registry,
+            instrument_mode: default_mode,
+        }
     }
 }
 
@@ -116,7 +177,8 @@ impl ObjectStoreRegistry for 
InstrumentedObjectStoreRegistry {
         url: &Url,
         store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        let instrumented = Arc::new(InstrumentedObjectStore::new(store));
+        let mode = AtomicU8::new(self.instrument_mode as u8);
+        let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode));
         self.inner.register_store(url, instrumented)
     }
 
@@ -131,11 +193,36 @@ mod tests {
 
     use super::*;
 
+    #[test]
+    fn instrumented_mode() {
+        assert!(matches!(
+            InstrumentedObjectStoreMode::default(),
+            InstrumentedObjectStoreMode::Disabled
+        ));
+
+        assert!(matches!(
+            "dIsABleD".parse().unwrap(),
+            InstrumentedObjectStoreMode::Disabled
+        ));
+        assert!(matches!(
+            "EnABlEd".parse().unwrap(),
+            InstrumentedObjectStoreMode::Enabled
+        ));
+        assert!("does_not_exist"
+            .parse::<InstrumentedObjectStoreMode>()
+            .is_err());
+
+        assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
+        assert!(matches!(1.into(), InstrumentedObjectStoreMode::Enabled));
+        assert!(matches!(2.into(), InstrumentedObjectStoreMode::Disabled));
+    }
+
     #[test]
     fn instrumented_registry() {
-        let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
-            DefaultObjectStoreRegistry::new(),
-        )));
+        let reg = Arc::new(InstrumentedObjectStoreRegistry::new(
+            Arc::new(DefaultObjectStoreRegistry::new()),
+            InstrumentedObjectStoreMode::default(),
+        ));
         let store = object_store::memory::InMemory::new();
 
         let url = "mem://test".parse().unwrap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to