This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a1acf775a9 Adds instrumentation to get requests for datafusion-cli 
(#18016)
a1acf775a9 is described below

commit a1acf775a9f358d153ae01d789e6b7292f66e68d
Author: Blake Orth <[email protected]>
AuthorDate: Sat Oct 11 07:13:40 2025 -0600

    Adds instrumentation to get requests for datafusion-cli (#18016)
    
    * Adds instrumentation to get requests for datafusion-cli
     - Adds request instrumentation to GET operations when using the
       InstrumentedObjectStore in datafusion-cli
     - Adds RequestDetails type to store and format details about
       instrumented requests
     - Adds tests for new functionality
    
    * Add cli-integration test
    
    * Verify the contents of the reqest
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 Cargo.lock                                         |   1 +
 datafusion-cli/Cargo.toml                          |   1 +
 datafusion-cli/src/object_storage/instrumented.rs  | 149 ++++++++++++++++++++-
 datafusion-cli/src/print_options.rs                |  11 +-
 datafusion-cli/tests/cli_integration.rs            |  79 +++++++++--
 .../object_store_profiling@s3_url_fallback.snap    |  53 ++++++++
 6 files changed, 282 insertions(+), 12 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 05c70d83b8..6bc85e2a7d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1933,6 +1933,7 @@ dependencies = [
  "async-trait",
  "aws-config",
  "aws-credential-types",
+ "chrono",
  "clap 4.5.48",
  "ctor",
  "datafusion",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 733da67dbf..fda9309a5d 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -39,6 +39,7 @@ arrow = { workspace = true }
 async-trait = { workspace = true }
 aws-config = "1.8.6"
 aws-credential-types = "1.2.7"
+chrono = { workspace = true }
 clap = { version = "4.5.47", features = ["derive", "cargo"] }
 datafusion = { workspace = true, features = [
     "avro",
diff --git a/datafusion-cli/src/object_storage/instrumented.rs 
b/datafusion-cli/src/object_storage/instrumented.rs
index 49f174799c..b579257af3 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -22,19 +22,22 @@ use std::{
         atomic::{AtomicU8, Ordering},
         Arc,
     },
+    time::Duration,
 };
 
 use async_trait::async_trait;
+use chrono::Utc;
 use datafusion::{
+    common::instant::Instant,
     error::DataFusionError,
     execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
 };
 use futures::stream::BoxStream;
 use object_store::{
-    path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
+    path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, 
ObjectMeta,
     ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, 
Result,
 };
-use parking_lot::RwLock;
+use parking_lot::{Mutex, RwLock};
 use url::Url;
 
 /// The profiling mode to use for an [`InstrumentedObjectStore`] instance. 
Collecting profiling
@@ -81,6 +84,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
 pub struct InstrumentedObjectStore {
     inner: Arc<dyn ObjectStore>,
     instrument_mode: AtomicU8,
+    requests: Mutex<Vec<RequestDetails>>,
 }
 
 impl InstrumentedObjectStore {
@@ -89,12 +93,46 @@ impl InstrumentedObjectStore {
         Self {
             inner: object_store,
             instrument_mode,
+            requests: Mutex::new(Vec::new()),
         }
     }
 
     fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
         self.instrument_mode.store(mode as u8, Ordering::Relaxed)
     }
+
+    /// Returns all [`RequestDetails`] accumulated in this 
[`InstrumentedObjectStore`] and clears
+    /// the stored requests
+    pub fn take_requests(&self) -> Vec<RequestDetails> {
+        let mut req = self.requests.lock();
+
+        req.drain(..).collect()
+    }
+
+    async fn instrumented_get_opts(
+        &self,
+        location: &Path,
+        options: GetOptions,
+    ) -> Result<GetResult> {
+        let timestamp = Utc::now();
+        let range = options.range.clone();
+
+        let start = Instant::now();
+        let ret = self.inner.get_opts(location, options).await?;
+        let elapsed = start.elapsed();
+
+        self.requests.lock().push(RequestDetails {
+            op: Operation::Get,
+            path: location.clone(),
+            timestamp,
+            duration: Some(elapsed),
+            size: Some((ret.range.end - ret.range.start) as usize),
+            range,
+            extra_display: None,
+        });
+
+        Ok(ret)
+    }
 }
 
 impl fmt::Display for InstrumentedObjectStore {
@@ -129,6 +167,12 @@ impl ObjectStore for InstrumentedObjectStore {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
+        if self.instrument_mode.load(Ordering::Relaxed)
+            != InstrumentedObjectStoreMode::Disabled as u8
+        {
+            return self.instrumented_get_opts(location, options).await;
+        }
+
         self.inner.get_opts(location, options).await
     }
 
@@ -157,6 +201,55 @@ impl ObjectStore for InstrumentedObjectStore {
     }
 }
 
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+enum Operation {
+    _Copy,
+    _Delete,
+    Get,
+    _Head,
+    _List,
+    _Put,
+}
+
+/// Holds profiling details about individual requests made through an 
[`InstrumentedObjectStore`]
+#[derive(Debug)]
+pub struct RequestDetails {
+    op: Operation,
+    path: Path,
+    timestamp: chrono::DateTime<Utc>,
+    duration: Option<Duration>,
+    size: Option<usize>,
+    range: Option<GetRange>,
+    extra_display: Option<String>,
+}
+
+impl fmt::Display for RequestDetails {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let mut output_parts = vec![format!(
+            "{} operation={:?}",
+            self.timestamp.to_rfc3339(),
+            self.op
+        )];
+
+        if let Some(d) = self.duration {
+            output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
+        }
+        if let Some(s) = self.size {
+            output_parts.push(format!("size={s}"));
+        }
+        if let Some(r) = &self.range {
+            output_parts.push(format!("range: {r}"));
+        }
+        output_parts.push(format!("path={}", self.path));
+
+        if let Some(ed) = &self.extra_display {
+            output_parts.push(ed.clone());
+        }
+
+        write!(f, "{}", output_parts.join(" "))
+    }
+}
+
 /// Provides access to [`InstrumentedObjectStore`] instances that record 
requests for reporting
 #[derive(Debug)]
 pub struct InstrumentedObjectStoreRegistry {
@@ -275,4 +368,56 @@ mod tests {
         assert!(fetched.is_ok());
         assert_eq!(reg.stores().len(), 1);
     }
+
+    #[tokio::test]
+    async fn instrumented_store() {
+        let store = Arc::new(object_store::memory::InMemory::new());
+        let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
+        let instrumented = InstrumentedObjectStore::new(store, mode);
+
+        // Load the test store with some data we can read
+        let path = Path::from("test/data");
+        let payload = PutPayload::from_static(b"test_data");
+        instrumented.put(&path, payload).await.unwrap();
+
+        // By default no requests should be instrumented/stored
+        assert!(instrumented.requests.lock().is_empty());
+        let _ = instrumented.get(&path).await.unwrap();
+        assert!(instrumented.requests.lock().is_empty());
+
+        instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Enabled);
+        assert!(instrumented.requests.lock().is_empty());
+        let _ = instrumented.get(&path).await.unwrap();
+        assert_eq!(instrumented.requests.lock().len(), 1);
+
+        let mut requests = instrumented.take_requests();
+        assert_eq!(requests.len(), 1);
+        assert!(instrumented.requests.lock().is_empty());
+
+        let request = requests.pop().unwrap();
+        assert_eq!(request.op, Operation::Get);
+        assert_eq!(request.path, path);
+        assert!(request.duration.is_some());
+        assert_eq!(request.size, Some(9));
+        assert_eq!(request.range, None);
+        assert!(request.extra_display.is_none());
+    }
+
+    #[test]
+    fn request_details() {
+        let rd = RequestDetails {
+            op: Operation::Get,
+            path: Path::from("test"),
+            timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
+            duration: Some(Duration::new(5, 0)),
+            size: Some(10),
+            range: Some((..10).into()),
+            extra_display: Some(String::from("extra info")),
+        };
+
+        assert_eq!(
+            format!("{rd}"),
+            "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s 
size=10 range: bytes=0-9 path=test extra info"
+        );
+    }
 }
diff --git a/datafusion-cli/src/print_options.rs 
b/datafusion-cli/src/print_options.rs
index 0df0106fb1..0d5e1e3e6f 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -193,7 +193,16 @@ impl PrintOptions {
             {
                 writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
                 for store in self.instrumented_registry.stores() {
-                    writeln!(writer, "{store}")?;
+                    let requests = store.take_requests();
+
+                    if !requests.is_empty() {
+                        writeln!(writer, "{store}")?;
+                        for req in requests.iter() {
+                            writeln!(writer, "{req}")?;
+                        }
+                        // Add an extra blank line to help visually organize 
the output
+                        writeln!(writer)?;
+                    }
                 }
             }
         }
diff --git a/datafusion-cli/tests/cli_integration.rs 
b/datafusion-cli/tests/cli_integration.rs
index b92b0790ba..cdb442c47f 100644
--- a/datafusion-cli/tests/cli_integration.rs
+++ b/datafusion-cli/tests/cli_integration.rs
@@ -19,6 +19,7 @@ use std::process::Command;
 
 use rstest::rstest;
 
+use async_trait::async_trait;
 use insta::{glob, Settings};
 use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
 use std::path::PathBuf;
@@ -374,8 +375,6 @@ async fn test_s3_url_fallback() {
     settings.set_snapshot_suffix("s3_url_fallback");
     let _bound = settings.bind_to_scope();
 
-    let port = container.get_host_port_ipv4(9000).await.unwrap();
-
     // Create a table using a prefix path (without trailing slash)
     // This should trigger the fallback logic where head() fails on the prefix
     // and list() is used to discover the actual files
@@ -389,11 +388,73 @@ OPTIONS (
 SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
 "#;
 
-    assert_cmd_snapshot!(cli()
-        .env_clear()
-        .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
-        .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
-        .env("AWS_ENDPOINT", format!("http://localhost:{port}";))
-        .env("AWS_ALLOW_HTTP", "true")
-        .pass_stdin(input));
+    assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
+}
+
+/// Validate object store profiling output
+#[tokio::test]
+async fn test_object_store_profiling() {
+    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
+        eprintln!("Skipping external storages integration tests");
+        return;
+    }
+
+    let container = setup_minio_container().await;
+
+    let mut settings = make_settings();
+    settings.set_snapshot_suffix("s3_url_fallback");
+
+    // as the object store profiling contains timestamps and durations, we must
+    // filter them out to have stable snapshots
+    //
+    // Example line to filter:
+    // 2025-10-11T12:02:59.722646+00:00 operation=Get duration=0.001495s 
size=1006 path=cars.csv
+    // Output:
+    // <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
+    settings.add_filter(
+        r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} 
operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)",
+        "<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
+    );
+
+    let _bound = settings.bind_to_scope();
+
+    let input = r#"
+    CREATE EXTERNAL TABLE CARS
+STORED AS CSV
+LOCATION 's3://data/cars.csv';
+
+-- Initial query should not show any profiling as the object store is not 
instrumented yet
+SELECT * from CARS LIMIT 1;
+\object_store_profiling enabled
+-- Query again to see the profiling output
+SELECT * from CARS LIMIT 1;
+\object_store_profiling disabled
+-- Final query should not show any profiling as we disabled it again
+SELECT * from CARS LIMIT 1;
+"#;
+
+    assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
+}
+
+/// Extension trait to Add the minio connection information to a Command
+#[async_trait]
+trait MinioCommandExt {
+    async fn with_minio(&mut self, container: &ContainerAsync<minio::MinIO>)
+        -> &mut Self;
+}
+
+#[async_trait]
+impl MinioCommandExt for Command {
+    async fn with_minio(
+        &mut self,
+        container: &ContainerAsync<minio::MinIO>,
+    ) -> &mut Self {
+        let port = container.get_host_port_ipv4(9000).await.unwrap();
+
+        self.env_clear()
+            .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
+            .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
+            .env("AWS_ENDPOINT", format!("http://localhost:{port}";))
+            .env("AWS_ALLOW_HTTP", "true")
+    }
 }
diff --git 
a/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap 
b/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap
new file mode 100644
index 0000000000..921efa7a1c
--- /dev/null
+++ b/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap
@@ -0,0 +1,53 @@
+---
+source: datafusion-cli/tests/cli_integration.rs
+info:
+  program: datafusion-cli
+  args: []
+  env:
+    AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
+    AWS_ALLOW_HTTP: "true"
+    AWS_ENDPOINT: "http://localhost:55031";
+    AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
+  stdin: "\n    CREATE EXTERNAL TABLE CARS\nSTORED AS CSV\nLOCATION 
's3://data/cars.csv';\n\n-- Initial query should not show any profiling as the 
object store is not instrumented yet\nSELECT * from CARS LIMIT 
1;\n\\object_store_profiling enabled\n-- Query again to see the profiling 
output\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling disabled\n-- 
Final query should not show any profiling as we disabled it again\nSELECT * 
from CARS LIMIT 1;\n"
+snapshot_kind: text
+---
+success: true
+exit_code: 0
+----- stdout -----
+[CLI_VERSION]
+0 row(s) fetched. 
+[ELAPSED]
+
++-----+-------+---------------------+
+| car | speed | time                |
++-----+-------+---------------------+
+| red | 20.0  | 1996-04-12T12:05:03 |
++-----+-------+---------------------+
+1 row(s) fetched. 
+[ELAPSED]
+
+ObjectStore Profile mode set to Enabled
++-----+-------+---------------------+
+| car | speed | time                |
++-----+-------+---------------------+
+| red | 20.0  | 1996-04-12T12:05:03 |
++-----+-------+---------------------+
+1 row(s) fetched. 
+[ELAPSED]
+
+Object Store Profiling
+Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data)
+<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
+
+ObjectStore Profile mode set to Disabled
++-----+-------+---------------------+
+| car | speed | time                |
++-----+-------+---------------------+
+| red | 20.0  | 1996-04-12T12:05:03 |
++-----+-------+---------------------+
+1 row(s) fetched. 
+[ELAPSED]
+
+\q
+
+----- stderr -----


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to