This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 73b1f2bd72 Adds Instrument Mode for InstrumentedObjectStore in
datafusion-cli (#18000)
73b1f2bd72 is described below
commit 73b1f2bd7279bd856e35420beadaf2879c4aa3fc
Author: Blake Orth <[email protected]>
AuthorDate: Thu Oct 9 14:00:56 2025 -0600
Adds Instrument Mode for InstrumentedObjectStore in datafusion-cli (#18000)
- Adds mode type to allow changing the mode of an
InstrumentedObjectStore to datafusion-cli
- Implements string parsing and u8 conversion for
InstrumentedObjectStoreMode
- Adds tests to validate trait implementations
---
datafusion-cli/src/main.rs | 11 ++-
datafusion-cli/src/object_storage/instrumented.rs | 107 ++++++++++++++++++++--
2 files changed, 104 insertions(+), 14 deletions(-)
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 39aca4cd13..04a8c2a0f1 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -33,7 +33,9 @@ use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
-use
datafusion_cli::object_storage::instrumented::InstrumentedObjectStoreRegistry;
+use datafusion_cli::object_storage::instrumented::{
+ InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
+};
use datafusion_cli::{
exec,
pool_type::PoolType,
@@ -208,9 +210,10 @@ async fn main_inner() -> Result<()> {
rt_builder = rt_builder.with_disk_manager_builder(builder);
}
- let instrumented_registry =
Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
- DefaultObjectStoreRegistry::new(),
- )));
+ let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new(
+ Arc::new(DefaultObjectStoreRegistry::new()),
+ InstrumentedObjectStoreMode::default(),
+ ));
rt_builder =
rt_builder.with_object_store_registry(instrumented_registry.clone());
let runtime_env = rt_builder.build_arc()?;
diff --git a/datafusion-cli/src/object_storage/instrumented.rs
b/datafusion-cli/src/object_storage/instrumented.rs
index c0802faf88..f0313da3a3 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -15,10 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-use std::{fmt, sync::Arc};
+use std::{
+ fmt,
+ str::FromStr,
+ sync::{
+ atomic::{AtomicU8, Ordering},
+ Arc,
+ },
+};
use async_trait::async_trait;
-use datafusion::execution::object_store::ObjectStoreRegistry;
+use datafusion::{error::DataFusionError,
execution::object_store::ObjectStoreRegistry};
use futures::stream::BoxStream;
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
@@ -26,25 +33,72 @@ use object_store::{
};
use url::Url;
+/// The profiling mode to use for an [`ObjectStore`] instance that has been
instrumented to collect
+/// profiling data. Collecting profiling data will have a small negative
impact on both CPU and
+/// memory usage. Default is `Disabled`
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
+pub enum InstrumentedObjectStoreMode {
+ /// Disable collection of profiling data
+ #[default]
+ Disabled,
+ /// Enable collection of profiling data
+ Enabled,
+}
+
+impl fmt::Display for InstrumentedObjectStoreMode {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self)
+ }
+}
+
+impl FromStr for InstrumentedObjectStoreMode {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "disabled" => Ok(Self::Disabled),
+ "enabled" => Ok(Self::Enabled),
+ _ => Err(DataFusionError::Execution(format!("Unrecognized mode
{s}"))),
+ }
+ }
+}
+
+impl From<u8> for InstrumentedObjectStoreMode {
+ fn from(value: u8) -> Self {
+ match value {
+ 1 => InstrumentedObjectStoreMode::Enabled,
+ _ => InstrumentedObjectStoreMode::Disabled,
+ }
+ }
+}
+
/// Wrapped [`ObjectStore`] instances that record information for reporting on
the usage of the
/// inner [`ObjectStore`]
#[derive(Debug)]
struct InstrumentedObjectStore {
inner: Arc<dyn ObjectStore>,
+ instrument_mode: AtomicU8,
}
impl InstrumentedObjectStore {
/// Returns a new [`InstrumentedObjectStore`] that wraps the provided
[`ObjectStore`]
- fn new(object_store: Arc<dyn ObjectStore>) -> Self {
+ fn new(object_store: Arc<dyn ObjectStore>, instrument_mode: AtomicU8) ->
Self {
Self {
inner: object_store,
+ instrument_mode,
}
}
}
impl fmt::Display for InstrumentedObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "Instrumented Object Store: {}", self.inner)
+ let mode: InstrumentedObjectStoreMode =
+ self.instrument_mode.load(Ordering::Relaxed).into();
+ write!(
+ f,
+ "Instrumented Object Store: instrument_mode: {mode}, inner: {}",
+ self.inner
+ )
}
}
@@ -100,13 +154,20 @@ impl ObjectStore for InstrumentedObjectStore {
#[derive(Debug)]
pub struct InstrumentedObjectStoreRegistry {
inner: Arc<dyn ObjectStoreRegistry>,
+ instrument_mode: InstrumentedObjectStoreMode,
}
impl InstrumentedObjectStoreRegistry {
/// Returns a new [`InstrumentedObjectStoreRegistry`] that wraps the
provided
/// [`ObjectStoreRegistry`]
- pub fn new(registry: Arc<dyn ObjectStoreRegistry>) -> Self {
- Self { inner: registry }
+ pub fn new(
+ registry: Arc<dyn ObjectStoreRegistry>,
+ default_mode: InstrumentedObjectStoreMode,
+ ) -> Self {
+ Self {
+ inner: registry,
+ instrument_mode: default_mode,
+ }
}
}
@@ -116,7 +177,8 @@ impl ObjectStoreRegistry for
InstrumentedObjectStoreRegistry {
url: &Url,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- let instrumented = Arc::new(InstrumentedObjectStore::new(store));
+ let mode = AtomicU8::new(self.instrument_mode as u8);
+ let instrumented = Arc::new(InstrumentedObjectStore::new(store, mode));
self.inner.register_store(url, instrumented)
}
@@ -131,11 +193,36 @@ mod tests {
use super::*;
+ #[test]
+ fn instrumented_mode() {
+ assert!(matches!(
+ InstrumentedObjectStoreMode::default(),
+ InstrumentedObjectStoreMode::Disabled
+ ));
+
+ assert!(matches!(
+ "dIsABleD".parse().unwrap(),
+ InstrumentedObjectStoreMode::Disabled
+ ));
+ assert!(matches!(
+ "EnABlEd".parse().unwrap(),
+ InstrumentedObjectStoreMode::Enabled
+ ));
+ assert!("does_not_exist"
+ .parse::<InstrumentedObjectStoreMode>()
+ .is_err());
+
+ assert!(matches!(0.into(), InstrumentedObjectStoreMode::Disabled));
+ assert!(matches!(1.into(), InstrumentedObjectStoreMode::Enabled));
+ assert!(matches!(2.into(), InstrumentedObjectStoreMode::Disabled));
+ }
+
#[test]
fn instrumented_registry() {
- let reg = Arc::new(InstrumentedObjectStoreRegistry::new(Arc::new(
- DefaultObjectStoreRegistry::new(),
- )));
+ let reg = Arc::new(InstrumentedObjectStoreRegistry::new(
+ Arc::new(DefaultObjectStoreRegistry::new()),
+ InstrumentedObjectStoreMode::default(),
+ ));
let store = object_store::memory::InMemory::new();
let url = "mem://test".parse().unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]