This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a1acf775a9 Adds instrumentation to get requests for datafusion-cli
(#18016)
a1acf775a9 is described below
commit a1acf775a9f358d153ae01d789e6b7292f66e68d
Author: Blake Orth <[email protected]>
AuthorDate: Sat Oct 11 07:13:40 2025 -0600
Adds instrumentation to get requests for datafusion-cli (#18016)
* Adds instrumentation to get requests for datafusion-cli
- Adds request instrumentation to GET operations when using the
InstrumentedObjectStore in datafusion-cli
- Adds RequestDetails type to store and format details about
instrumented requests
- Adds tests for new functionality
* Add cli-integration test
* Verify the contents of the reqest
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
Cargo.lock | 1 +
datafusion-cli/Cargo.toml | 1 +
datafusion-cli/src/object_storage/instrumented.rs | 149 ++++++++++++++++++++-
datafusion-cli/src/print_options.rs | 11 +-
datafusion-cli/tests/cli_integration.rs | 79 +++++++++--
.../object_store_profiling@s3_url_fallback.snap | 53 ++++++++
6 files changed, 282 insertions(+), 12 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 05c70d83b8..6bc85e2a7d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1933,6 +1933,7 @@ dependencies = [
"async-trait",
"aws-config",
"aws-credential-types",
+ "chrono",
"clap 4.5.48",
"ctor",
"datafusion",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index 733da67dbf..fda9309a5d 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -39,6 +39,7 @@ arrow = { workspace = true }
async-trait = { workspace = true }
aws-config = "1.8.6"
aws-credential-types = "1.2.7"
+chrono = { workspace = true }
clap = { version = "4.5.47", features = ["derive", "cargo"] }
datafusion = { workspace = true, features = [
"avro",
diff --git a/datafusion-cli/src/object_storage/instrumented.rs
b/datafusion-cli/src/object_storage/instrumented.rs
index 49f174799c..b579257af3 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -22,19 +22,22 @@ use std::{
atomic::{AtomicU8, Ordering},
Arc,
},
+ time::Duration,
};
use async_trait::async_trait;
+use chrono::Utc;
use datafusion::{
+ common::instant::Instant,
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
use futures::stream::BoxStream;
use object_store::{
- path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
+ path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload,
ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
Result,
};
-use parking_lot::RwLock;
+use parking_lot::{Mutex, RwLock};
use url::Url;
/// The profiling mode to use for an [`InstrumentedObjectStore`] instance.
Collecting profiling
@@ -81,6 +84,7 @@ impl From<u8> for InstrumentedObjectStoreMode {
pub struct InstrumentedObjectStore {
inner: Arc<dyn ObjectStore>,
instrument_mode: AtomicU8,
+ requests: Mutex<Vec<RequestDetails>>,
}
impl InstrumentedObjectStore {
@@ -89,12 +93,46 @@ impl InstrumentedObjectStore {
Self {
inner: object_store,
instrument_mode,
+ requests: Mutex::new(Vec::new()),
}
}
fn set_instrument_mode(&self, mode: InstrumentedObjectStoreMode) {
self.instrument_mode.store(mode as u8, Ordering::Relaxed)
}
+
+ /// Returns all [`RequestDetails`] accumulated in this
[`InstrumentedObjectStore`] and clears
+ /// the stored requests
+ pub fn take_requests(&self) -> Vec<RequestDetails> {
+ let mut req = self.requests.lock();
+
+ req.drain(..).collect()
+ }
+
+ async fn instrumented_get_opts(
+ &self,
+ location: &Path,
+ options: GetOptions,
+ ) -> Result<GetResult> {
+ let timestamp = Utc::now();
+ let range = options.range.clone();
+
+ let start = Instant::now();
+ let ret = self.inner.get_opts(location, options).await?;
+ let elapsed = start.elapsed();
+
+ self.requests.lock().push(RequestDetails {
+ op: Operation::Get,
+ path: location.clone(),
+ timestamp,
+ duration: Some(elapsed),
+ size: Some((ret.range.end - ret.range.start) as usize),
+ range,
+ extra_display: None,
+ });
+
+ Ok(ret)
+ }
}
impl fmt::Display for InstrumentedObjectStore {
@@ -129,6 +167,12 @@ impl ObjectStore for InstrumentedObjectStore {
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
+ if self.instrument_mode.load(Ordering::Relaxed)
+ != InstrumentedObjectStoreMode::Disabled as u8
+ {
+ return self.instrumented_get_opts(location, options).await;
+ }
+
self.inner.get_opts(location, options).await
}
@@ -157,6 +201,55 @@ impl ObjectStore for InstrumentedObjectStore {
}
}
+#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+enum Operation {
+ _Copy,
+ _Delete,
+ Get,
+ _Head,
+ _List,
+ _Put,
+}
+
+/// Holds profiling details about individual requests made through an
[`InstrumentedObjectStore`]
+#[derive(Debug)]
+pub struct RequestDetails {
+ op: Operation,
+ path: Path,
+ timestamp: chrono::DateTime<Utc>,
+ duration: Option<Duration>,
+ size: Option<usize>,
+ range: Option<GetRange>,
+ extra_display: Option<String>,
+}
+
+impl fmt::Display for RequestDetails {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut output_parts = vec![format!(
+ "{} operation={:?}",
+ self.timestamp.to_rfc3339(),
+ self.op
+ )];
+
+ if let Some(d) = self.duration {
+ output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
+ }
+ if let Some(s) = self.size {
+ output_parts.push(format!("size={s}"));
+ }
+ if let Some(r) = &self.range {
+ output_parts.push(format!("range: {r}"));
+ }
+ output_parts.push(format!("path={}", self.path));
+
+ if let Some(ed) = &self.extra_display {
+ output_parts.push(ed.clone());
+ }
+
+ write!(f, "{}", output_parts.join(" "))
+ }
+}
+
/// Provides access to [`InstrumentedObjectStore`] instances that record
requests for reporting
#[derive(Debug)]
pub struct InstrumentedObjectStoreRegistry {
@@ -275,4 +368,56 @@ mod tests {
assert!(fetched.is_ok());
assert_eq!(reg.stores().len(), 1);
}
+
+ #[tokio::test]
+ async fn instrumented_store() {
+ let store = Arc::new(object_store::memory::InMemory::new());
+ let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
+ let instrumented = InstrumentedObjectStore::new(store, mode);
+
+ // Load the test store with some data we can read
+ let path = Path::from("test/data");
+ let payload = PutPayload::from_static(b"test_data");
+ instrumented.put(&path, payload).await.unwrap();
+
+ // By default no requests should be instrumented/stored
+ assert!(instrumented.requests.lock().is_empty());
+ let _ = instrumented.get(&path).await.unwrap();
+ assert!(instrumented.requests.lock().is_empty());
+
+ instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Enabled);
+ assert!(instrumented.requests.lock().is_empty());
+ let _ = instrumented.get(&path).await.unwrap();
+ assert_eq!(instrumented.requests.lock().len(), 1);
+
+ let mut requests = instrumented.take_requests();
+ assert_eq!(requests.len(), 1);
+ assert!(instrumented.requests.lock().is_empty());
+
+ let request = requests.pop().unwrap();
+ assert_eq!(request.op, Operation::Get);
+ assert_eq!(request.path, path);
+ assert!(request.duration.is_some());
+ assert_eq!(request.size, Some(9));
+ assert_eq!(request.range, None);
+ assert!(request.extra_display.is_none());
+ }
+
+ #[test]
+ fn request_details() {
+ let rd = RequestDetails {
+ op: Operation::Get,
+ path: Path::from("test"),
+ timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
+ duration: Some(Duration::new(5, 0)),
+ size: Some(10),
+ range: Some((..10).into()),
+ extra_display: Some(String::from("extra info")),
+ };
+
+ assert_eq!(
+ format!("{rd}"),
+ "1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s
size=10 range: bytes=0-9 path=test extra info"
+ );
+ }
}
diff --git a/datafusion-cli/src/print_options.rs
b/datafusion-cli/src/print_options.rs
index 0df0106fb1..0d5e1e3e6f 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -193,7 +193,16 @@ impl PrintOptions {
{
writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
for store in self.instrumented_registry.stores() {
- writeln!(writer, "{store}")?;
+ let requests = store.take_requests();
+
+ if !requests.is_empty() {
+ writeln!(writer, "{store}")?;
+ for req in requests.iter() {
+ writeln!(writer, "{req}")?;
+ }
+ // Add an extra blank line to help visually organize
the output
+ writeln!(writer)?;
+ }
}
}
}
diff --git a/datafusion-cli/tests/cli_integration.rs
b/datafusion-cli/tests/cli_integration.rs
index b92b0790ba..cdb442c47f 100644
--- a/datafusion-cli/tests/cli_integration.rs
+++ b/datafusion-cli/tests/cli_integration.rs
@@ -19,6 +19,7 @@ use std::process::Command;
use rstest::rstest;
+use async_trait::async_trait;
use insta::{glob, Settings};
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
use std::path::PathBuf;
@@ -374,8 +375,6 @@ async fn test_s3_url_fallback() {
settings.set_snapshot_suffix("s3_url_fallback");
let _bound = settings.bind_to_scope();
- let port = container.get_host_port_ipv4(9000).await.unwrap();
-
// Create a table using a prefix path (without trailing slash)
// This should trigger the fallback logic where head() fails on the prefix
// and list() is used to discover the actual files
@@ -389,11 +388,73 @@ OPTIONS (
SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
"#;
- assert_cmd_snapshot!(cli()
- .env_clear()
- .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
- .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
- .env("AWS_ENDPOINT", format!("http://localhost:{port}"))
- .env("AWS_ALLOW_HTTP", "true")
- .pass_stdin(input));
+ assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
+}
+
+/// Validate object store profiling output
+#[tokio::test]
+async fn test_object_store_profiling() {
+ if env::var("TEST_STORAGE_INTEGRATION").is_err() {
+ eprintln!("Skipping external storages integration tests");
+ return;
+ }
+
+ let container = setup_minio_container().await;
+
+ let mut settings = make_settings();
+ settings.set_snapshot_suffix("s3_url_fallback");
+
+ // as the object store profiling contains timestamps and durations, we must
+ // filter them out to have stable snapshots
+ //
+ // Example line to filter:
+ // 2025-10-11T12:02:59.722646+00:00 operation=Get duration=0.001495s
size=1006 path=cars.csv
+ // Output:
+ // <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
+ settings.add_filter(
+ r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2}
operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s size=(\d+) path=(.*)",
+ "<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
+ );
+
+ let _bound = settings.bind_to_scope();
+
+ let input = r#"
+ CREATE EXTERNAL TABLE CARS
+STORED AS CSV
+LOCATION 's3://data/cars.csv';
+
+-- Initial query should not show any profiling as the object store is not
instrumented yet
+SELECT * from CARS LIMIT 1;
+\object_store_profiling enabled
+-- Query again to see the profiling output
+SELECT * from CARS LIMIT 1;
+\object_store_profiling disabled
+-- Final query should not show any profiling as we disabled it again
+SELECT * from CARS LIMIT 1;
+"#;
+
+ assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
+}
+
+/// Extension trait to Add the minio connection information to a Command
+#[async_trait]
+trait MinioCommandExt {
+ async fn with_minio(&mut self, container: &ContainerAsync<minio::MinIO>)
+ -> &mut Self;
+}
+
+#[async_trait]
+impl MinioCommandExt for Command {
+ async fn with_minio(
+ &mut self,
+ container: &ContainerAsync<minio::MinIO>,
+ ) -> &mut Self {
+ let port = container.get_host_port_ipv4(9000).await.unwrap();
+
+ self.env_clear()
+ .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
+ .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
+ .env("AWS_ENDPOINT", format!("http://localhost:{port}"))
+ .env("AWS_ALLOW_HTTP", "true")
+ }
}
diff --git
a/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap
b/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap
new file mode 100644
index 0000000000..921efa7a1c
--- /dev/null
+++ b/datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap
@@ -0,0 +1,53 @@
+---
+source: datafusion-cli/tests/cli_integration.rs
+info:
+ program: datafusion-cli
+ args: []
+ env:
+ AWS_ACCESS_KEY_ID: TEST-DataFusionLogin
+ AWS_ALLOW_HTTP: "true"
+ AWS_ENDPOINT: "http://localhost:55031"
+ AWS_SECRET_ACCESS_KEY: TEST-DataFusionPassword
+ stdin: "\n CREATE EXTERNAL TABLE CARS\nSTORED AS CSV\nLOCATION
's3://data/cars.csv';\n\n-- Initial query should not show any profiling as the
object store is not instrumented yet\nSELECT * from CARS LIMIT
1;\n\\object_store_profiling enabled\n-- Query again to see the profiling
output\nSELECT * from CARS LIMIT 1;\n\\object_store_profiling disabled\n--
Final query should not show any profiling as we disabled it again\nSELECT *
from CARS LIMIT 1;\n"
+snapshot_kind: text
+---
+success: true
+exit_code: 0
+----- stdout -----
+[CLI_VERSION]
+0 row(s) fetched.
+[ELAPSED]
+
++-----+-------+---------------------+
+| car | speed | time |
++-----+-------+---------------------+
+| red | 20.0 | 1996-04-12T12:05:03 |
++-----+-------+---------------------+
+1 row(s) fetched.
+[ELAPSED]
+
+ObjectStore Profile mode set to Enabled
++-----+-------+---------------------+
+| car | speed | time |
++-----+-------+---------------------+
+| red | 20.0 | 1996-04-12T12:05:03 |
++-----+-------+---------------------+
+1 row(s) fetched.
+[ELAPSED]
+
+Object Store Profiling
+Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data)
+<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
+
+ObjectStore Profile mode set to Disabled
++-----+-------+---------------------+
+| car | speed | time |
++-----+-------+---------------------+
+| red | 20.0 | 1996-04-12T12:05:03 |
++-----+-------+---------------------+
+1 row(s) fetched.
+[ELAPSED]
+
+\q
+
+----- stderr -----
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]