This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 39226c3f30 [datafusion-cli] Replace mutex with AtomicU64 for stream 
duration tracking in instrumentedObjectStore (#20802)
39226c3f30 is described below

commit 39226c3f30969d7158373cc756f696dc5e550a07
Author: Burak Şen <[email protected]>
AuthorDate: Tue Mar 10 14:35:46 2026 +0300

    [datafusion-cli] Replace mutex with AtomicU64 for stream duration tracking 
in instrumentedObjectStore (#20802)
    
    ## Which issue does this PR close?
    Related to #18138 but does not close any issue.
    
    ## Rationale for this change
    TimeToFirstItemStream held an Arc<Mutex<Vec<RequestDetails>>> and a
    request_index to write back the duration into the shared request list. I
    saw @alamb and @BlakeOrth's reviews on the PR #19127 about the
    improvements and wanted to change.
    
    ## What changes are included in this PR?
    - Replace Arc<Mutex<Vec<RequestDetails>>> + index in
    TimeToFirstItemStream with a per-request Arc<AtomicU64>
    - Store duration as nanoseconds in AtomicU64 (0 = not yet set) with
    Release/Acquire ordering
    - Start the timer lazily on the first poll_next call instead of at
    stream creation, so only actual storage latency is measured
    
    ## Are these changes tested?
    Existing tests and I've also added time comparison
    
    ## Are there any user-facing changes?
    No
---
 datafusion-cli/src/object_storage/instrumented.rs | 209 ++++++++++++++--------
 1 file changed, 134 insertions(+), 75 deletions(-)

diff --git a/datafusion-cli/src/object_storage/instrumented.rs 
b/datafusion-cli/src/object_storage/instrumented.rs
index b4f1a043ac..a0321cacb3 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -21,7 +21,7 @@ use std::{
     str::FromStr,
     sync::{
         Arc,
-        atomic::{AtomicU8, Ordering},
+        atomic::{AtomicU8, AtomicU64, Ordering},
     },
     time::Duration,
 };
@@ -45,28 +45,25 @@ use object_store::{
 use parking_lot::{Mutex, RwLock};
 use url::Url;
 
-/// A stream wrapper that measures the time until the first response(item or 
end of stream) is yielded
+/// A stream wrapper that measures the time until the first response(item or 
end of stream) is yielded.
+///
+/// The timer starts on the first `poll_next` call (not at stream creation) to 
avoid
+/// measuring unrelated work between stream creation and first poll.
+/// Duration is stored as nanoseconds in an `AtomicU64` (0 = not yet set).
 struct TimeToFirstItemStream<S> {
     inner: S,
-    start: Instant,
-    request_index: usize,
-    requests: Arc<Mutex<Vec<RequestDetails>>>,
-    first_item_yielded: bool,
+    start: Option<Instant>,
+    request_duration: Arc<AtomicU64>,
+    duration_recorded: bool,
 }
 
 impl<S> TimeToFirstItemStream<S> {
-    fn new(
-        inner: S,
-        start: Instant,
-        request_index: usize,
-        requests: Arc<Mutex<Vec<RequestDetails>>>,
-    ) -> Self {
+    fn new(inner: S, request_duration: Arc<AtomicU64>) -> Self {
         Self {
             inner,
-            start,
-            request_index,
-            requests,
-            first_item_yielded: false,
+            start: None,
+            request_duration,
+            duration_recorded: false,
         }
     }
 }
@@ -81,16 +78,14 @@ where
         mut self: std::pin::Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
+        let start = *self.start.get_or_insert_with(Instant::now);
 
-        if !self.first_item_yielded && poll_result.is_ready() {
-            self.first_item_yielded = true;
-            let elapsed = self.start.elapsed();
+        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
 
-            let mut requests = self.requests.lock();
-            if let Some(request) = requests.get_mut(self.request_index) {
-                request.duration = Some(elapsed);
-            }
+        if !self.duration_recorded && poll_result.is_ready() {
+            self.duration_recorded = true;
+            let nanos = start.elapsed().as_nanos() as u64;
+            self.request_duration.store(nanos, Ordering::Release);
         }
 
         poll_result
@@ -191,7 +186,7 @@ impl InstrumentedObjectStore {
             op: Operation::Put,
             path: location.clone(),
             timestamp,
-            duration: Some(elapsed),
+            duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as 
u64)),
             size: Some(size),
             range: None,
             extra_display: None,
@@ -214,7 +209,7 @@ impl InstrumentedObjectStore {
             op: Operation::Put,
             path: location.clone(),
             timestamp,
-            duration: Some(elapsed),
+            duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as 
u64)),
             size: None,
             range: None,
             extra_display: None,
@@ -249,7 +244,7 @@ impl InstrumentedObjectStore {
             op,
             path: location.clone(),
             timestamp,
-            duration: Some(elapsed),
+            duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as 
u64)),
             size,
             range,
             extra_display: None,
@@ -274,7 +269,7 @@ impl InstrumentedObjectStore {
                     op: Operation::Delete,
                     path: location.clone(),
                     timestamp,
-                    duration: Some(elapsed),
+                    duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() 
as u64)),
                     size: None,
                     range: None,
                     extra_display: None,
@@ -289,31 +284,20 @@ impl InstrumentedObjectStore {
         prefix: Option<&Path>,
     ) -> BoxStream<'static, Result<ObjectMeta>> {
         let timestamp = Utc::now();
-        let start = Instant::now();
         let inner_stream = self.inner.list(prefix);
 
-        let request_index = {
-            let mut requests = self.requests.lock();
-            requests.push(RequestDetails {
-                op: Operation::List,
-                path: prefix.cloned().unwrap_or_else(|| Path::from("")),
-                timestamp,
-                duration: None,
-                size: None,
-                range: None,
-                extra_display: None,
-            });
-            requests.len() - 1
-        };
-
-        let wrapped_stream = TimeToFirstItemStream::new(
-            inner_stream,
-            start,
-            request_index,
-            Arc::clone(&self.requests),
-        );
+        let duration_nanos = Arc::new(AtomicU64::new(0));
+        self.requests.lock().push(RequestDetails {
+            op: Operation::List,
+            path: prefix.cloned().unwrap_or_else(|| Path::from("")),
+            timestamp,
+            duration_nanos: Arc::clone(&duration_nanos),
+            size: None,
+            range: None,
+            extra_display: None,
+        });
 
-        Box::pin(wrapped_stream)
+        Box::pin(TimeToFirstItemStream::new(inner_stream, duration_nanos))
     }
 
     async fn instrumented_list_with_delimiter(
@@ -329,7 +313,7 @@ impl InstrumentedObjectStore {
             op: Operation::List,
             path: prefix.cloned().unwrap_or_else(|| Path::from("")),
             timestamp,
-            duration: Some(elapsed),
+            duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as 
u64)),
             size: None,
             range: None,
             extra_display: None,
@@ -348,7 +332,7 @@ impl InstrumentedObjectStore {
             op: Operation::Copy,
             path: from.clone(),
             timestamp,
-            duration: Some(elapsed),
+            duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as 
u64)),
             size: None,
             range: None,
             extra_display: Some(format!("copy_to: {to}")),
@@ -371,7 +355,7 @@ impl InstrumentedObjectStore {
             op: Operation::Copy,
             path: from.clone(),
             timestamp,
-            duration: Some(elapsed),
+            duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as 
u64)),
             size: None,
             range: None,
             extra_display: Some(format!("copy_to: {to}")),
@@ -494,17 +478,42 @@ impl fmt::Display for Operation {
 }
 
 /// Holds profiling details about individual requests made through an 
[`InstrumentedObjectStore`]
-#[derive(Debug)]
 pub struct RequestDetails {
     op: Operation,
     path: Path,
     timestamp: chrono::DateTime<Utc>,
-    duration: Option<Duration>,
+    /// Duration stored as nanoseconds in an AtomicU64. 0 means not yet set.
+    duration_nanos: Arc<AtomicU64>,
     size: Option<usize>,
     range: Option<GetRange>,
     extra_display: Option<String>,
 }
 
+impl fmt::Debug for RequestDetails {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("RequestDetails")
+            .field("op", &self.op)
+            .field("path", &self.path)
+            .field("timestamp", &self.timestamp)
+            .field("duration", &self.duration())
+            .field("size", &self.size)
+            .field("range", &self.range)
+            .field("extra_display", &self.extra_display)
+            .finish()
+    }
+}
+
+impl RequestDetails {
+    fn duration(&self) -> Option<Duration> {
+        let nanos = self.duration_nanos.load(Ordering::Acquire);
+        if nanos == 0 {
+            None
+        } else {
+            Some(Duration::from_nanos(nanos))
+        }
+    }
+}
+
 impl fmt::Display for RequestDetails {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         let mut output_parts = vec![format!(
@@ -513,7 +522,7 @@ impl fmt::Display for RequestDetails {
             self.op
         )];
 
-        if let Some(d) = self.duration {
+        if let Some(d) = self.duration() {
             output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
         }
         if let Some(s) = self.size {
@@ -700,7 +709,7 @@ impl RequestSummary {
     }
     fn push(&mut self, request: &RequestDetails) {
         self.count += 1;
-        if let Some(dur) = request.duration {
+        if let Some(dur) = request.duration() {
             self.duration_stats.get_or_insert_default().push(dur)
         }
         if let Some(size) = request.size {
@@ -916,7 +925,7 @@ mod tests {
         let request = requests.pop().unwrap();
         assert_eq!(request.op, Operation::Get);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert_eq!(request.size, Some(9));
         assert_eq!(request.range, None);
         assert!(request.extra_display.is_none());
@@ -945,7 +954,7 @@ mod tests {
         let request = requests.pop().unwrap();
         assert_eq!(request.op, Operation::Delete);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert!(request.extra_display.is_none());
@@ -963,19 +972,57 @@ mod tests {
         instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
         assert!(instrumented.requests.lock().is_empty());
         let mut stream = instrumented.list(Some(&path));
-        // Consume at least one item from the stream to trigger duration 
measurement
+        // Sleep between stream creation and first poll to verify the timer
+        // starts on first poll, not at stream creation.
+        let delay = Duration::from_millis(50);
+        tokio::time::sleep(delay).await;
         let _ = stream.next().await;
         assert_eq!(instrumented.requests.lock().len(), 1);
 
         let request = instrumented.take_requests().pop().unwrap();
         assert_eq!(request.op, Operation::List);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        let duration = request
+            .duration()
+            .expect("duration should be set after consuming stream");
+        assert!(
+            duration < delay,
+            "duration {duration:?} should exclude the {delay:?} sleep before 
first poll"
+        );
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert!(request.extra_display.is_none());
     }
 
+    #[tokio::test]
+    async fn time_to_first_item_stream_captures_inner_latency() {
+        let inner_delay = Duration::from_millis(50);
+        let inner_stream = futures::stream::once(async move {
+            tokio::time::sleep(inner_delay).await;
+            Ok(ObjectMeta {
+                location: Path::from("test"),
+                last_modified: Utc::now(),
+                size: 0,
+                e_tag: None,
+                version: None,
+            })
+        })
+        .boxed();
+
+        let duration_nanos = Arc::new(AtomicU64::new(0));
+        let mut stream = Box::pin(TimeToFirstItemStream::new(
+            inner_stream,
+            Arc::clone(&duration_nanos),
+        ));
+        let _ = stream.next().await;
+
+        let recorded = 
Duration::from_nanos(duration_nanos.load(Ordering::Acquire));
+        assert!(
+            recorded >= inner_delay,
+            "recorded duration {recorded:?} should be >= inner stream delay 
{inner_delay:?}"
+        );
+    }
+
     #[tokio::test]
     async fn instrumented_store_list_with_delimiter() {
         let (instrumented, path) = setup_test_store().await;
@@ -993,7 +1040,7 @@ mod tests {
         let request = instrumented.take_requests().pop().unwrap();
         assert_eq!(request.op, Operation::List);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert!(request.extra_display.is_none());
@@ -1024,7 +1071,7 @@ mod tests {
         let request = instrumented.take_requests().pop().unwrap();
         assert_eq!(request.op, Operation::Put);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert_eq!(request.size.unwrap(), size);
         assert!(request.range.is_none());
         assert!(request.extra_display.is_none());
@@ -1059,7 +1106,7 @@ mod tests {
         let request = instrumented.take_requests().pop().unwrap();
         assert_eq!(request.op, Operation::Put);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert!(request.extra_display.is_none());
@@ -1087,7 +1134,7 @@ mod tests {
         let request = requests.pop().unwrap();
         assert_eq!(request.op, Operation::Copy);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert_eq!(
@@ -1126,7 +1173,7 @@ mod tests {
         let request = requests.pop().unwrap();
         assert_eq!(request.op, Operation::Copy);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert_eq!(
@@ -1156,7 +1203,7 @@ mod tests {
         let request = requests.pop().unwrap();
         assert_eq!(request.op, Operation::Head);
         assert_eq!(request.path, path);
-        assert!(request.duration.is_some());
+        assert!(request.duration().is_some());
         assert!(request.size.is_none());
         assert!(request.range.is_none());
         assert!(request.extra_display.is_none());
@@ -1168,7 +1215,9 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test"),
             timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
-            duration: Some(Duration::new(5, 0)),
+            duration_nanos: Arc::new(AtomicU64::new(
+                Duration::new(5, 0).as_nanos() as u64
+            )),
             size: Some(10),
             range: Some((..10).into()),
             extra_display: Some(String::from("extra info")),
@@ -1195,7 +1244,9 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test1"),
             timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
-            duration: Some(Duration::from_secs(5)),
+            duration_nanos: Arc::new(AtomicU64::new(
+                Duration::from_secs(5).as_nanos() as u64
+            )),
             size: Some(100),
             range: None,
             extra_display: None,
@@ -1215,7 +1266,9 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test2"),
             timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
-            duration: Some(Duration::from_secs(8)),
+            duration_nanos: Arc::new(AtomicU64::new(
+                Duration::from_secs(8).as_nanos() as u64
+            )),
             size: Some(150),
             range: None,
             extra_display: None,
@@ -1224,7 +1277,9 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test3"),
             timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
-            duration: Some(Duration::from_secs(2)),
+            duration_nanos: Arc::new(AtomicU64::new(
+                Duration::from_secs(2).as_nanos() as u64
+            )),
             size: Some(50),
             range: None,
             extra_display: None,
@@ -1243,7 +1298,9 @@ mod tests {
             op: Operation::Put,
             path: Path::from("test4"),
             timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
-            duration: Some(Duration::from_millis(200)),
+            duration_nanos: Arc::new(AtomicU64::new(
+                Duration::from_millis(200).as_nanos() as u64,
+            )),
             size: Some(75),
             range: None,
             extra_display: None,
@@ -1268,7 +1325,9 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test1"),
             timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
-            duration: Some(Duration::from_secs(3)),
+            duration_nanos: Arc::new(AtomicU64::new(
+                Duration::from_secs(3).as_nanos() as u64
+            )),
             size: None,
             range: None,
             extra_display: None,
@@ -1290,7 +1349,7 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test1"),
             timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
-            duration: None,
+            duration_nanos: Arc::new(AtomicU64::new(0)),
             size: Some(200),
             range: None,
             extra_display: None,
@@ -1312,7 +1371,7 @@ mod tests {
             op: Operation::Get,
             path: Path::from("test1"),
             timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
-            duration: None,
+            duration_nanos: Arc::new(AtomicU64::new(0)),
             size: None,
             range: None,
             extra_display: None,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to