This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 39226c3f30 [datafusion-cli] Replace mutex with AtomicU64 for stream
duration tracking in instrumentedObjectStore (#20802)
39226c3f30 is described below
commit 39226c3f30969d7158373cc756f696dc5e550a07
Author: Burak Şen <[email protected]>
AuthorDate: Tue Mar 10 14:35:46 2026 +0300
[datafusion-cli] Replace mutex with AtomicU64 for stream duration tracking
in instrumentedObjectStore (#20802)
## Which issue does this PR close?
Related to #18138 but does not close any issue.
## Rationale for this change
TimeToFirstItemStream held an Arc<Mutex<Vec<RequestDetails>>> and a
request_index to write back the duration into the shared request list. I
saw @alamb and @BlakeOrth's reviews on the PR #19127 about the
improvements and wanted to change.
## What changes are included in this PR?
- Replace Arc<Mutex<Vec<RequestDetails>>> + index in
TimeToFirstItemStream with a per-request Arc<AtomicU64>
- Store duration as nanoseconds in AtomicU64 (0 = not yet set) with
Release/Acquire ordering
- Start the timer lazily on the first poll_next call instead of at
stream creation, so only actual storage latency is measured
## Are these changes tested?
Existing tests and I've also added time comparison
## Are there any user-facing changes?
No
---
datafusion-cli/src/object_storage/instrumented.rs | 209 ++++++++++++++--------
1 file changed, 134 insertions(+), 75 deletions(-)
diff --git a/datafusion-cli/src/object_storage/instrumented.rs
b/datafusion-cli/src/object_storage/instrumented.rs
index b4f1a043ac..a0321cacb3 100644
--- a/datafusion-cli/src/object_storage/instrumented.rs
+++ b/datafusion-cli/src/object_storage/instrumented.rs
@@ -21,7 +21,7 @@ use std::{
str::FromStr,
sync::{
Arc,
- atomic::{AtomicU8, Ordering},
+ atomic::{AtomicU8, AtomicU64, Ordering},
},
time::Duration,
};
@@ -45,28 +45,25 @@ use object_store::{
use parking_lot::{Mutex, RwLock};
use url::Url;
-/// A stream wrapper that measures the time until the first response(item or
end of stream) is yielded
+/// A stream wrapper that measures the time until the first response(item or
end of stream) is yielded.
+///
+/// The timer starts on the first `poll_next` call (not at stream creation) to
avoid
+/// measuring unrelated work between stream creation and first poll.
+/// Duration is stored as nanoseconds in an `AtomicU64` (0 = not yet set).
struct TimeToFirstItemStream<S> {
inner: S,
- start: Instant,
- request_index: usize,
- requests: Arc<Mutex<Vec<RequestDetails>>>,
- first_item_yielded: bool,
+ start: Option<Instant>,
+ request_duration: Arc<AtomicU64>,
+ duration_recorded: bool,
}
impl<S> TimeToFirstItemStream<S> {
- fn new(
- inner: S,
- start: Instant,
- request_index: usize,
- requests: Arc<Mutex<Vec<RequestDetails>>>,
- ) -> Self {
+ fn new(inner: S, request_duration: Arc<AtomicU64>) -> Self {
Self {
inner,
- start,
- request_index,
- requests,
- first_item_yielded: false,
+ start: None,
+ request_duration,
+ duration_recorded: false,
}
}
}
@@ -81,16 +78,14 @@ where
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
- let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
+ let start = *self.start.get_or_insert_with(Instant::now);
- if !self.first_item_yielded && poll_result.is_ready() {
- self.first_item_yielded = true;
- let elapsed = self.start.elapsed();
+ let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
- let mut requests = self.requests.lock();
- if let Some(request) = requests.get_mut(self.request_index) {
- request.duration = Some(elapsed);
- }
+ if !self.duration_recorded && poll_result.is_ready() {
+ self.duration_recorded = true;
+ let nanos = start.elapsed().as_nanos() as u64;
+ self.request_duration.store(nanos, Ordering::Release);
}
poll_result
@@ -191,7 +186,7 @@ impl InstrumentedObjectStore {
op: Operation::Put,
path: location.clone(),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as
u64)),
size: Some(size),
range: None,
extra_display: None,
@@ -214,7 +209,7 @@ impl InstrumentedObjectStore {
op: Operation::Put,
path: location.clone(),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as
u64)),
size: None,
range: None,
extra_display: None,
@@ -249,7 +244,7 @@ impl InstrumentedObjectStore {
op,
path: location.clone(),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as
u64)),
size,
range,
extra_display: None,
@@ -274,7 +269,7 @@ impl InstrumentedObjectStore {
op: Operation::Delete,
path: location.clone(),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos()
as u64)),
size: None,
range: None,
extra_display: None,
@@ -289,31 +284,20 @@ impl InstrumentedObjectStore {
prefix: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let timestamp = Utc::now();
- let start = Instant::now();
let inner_stream = self.inner.list(prefix);
- let request_index = {
- let mut requests = self.requests.lock();
- requests.push(RequestDetails {
- op: Operation::List,
- path: prefix.cloned().unwrap_or_else(|| Path::from("")),
- timestamp,
- duration: None,
- size: None,
- range: None,
- extra_display: None,
- });
- requests.len() - 1
- };
-
- let wrapped_stream = TimeToFirstItemStream::new(
- inner_stream,
- start,
- request_index,
- Arc::clone(&self.requests),
- );
+ let duration_nanos = Arc::new(AtomicU64::new(0));
+ self.requests.lock().push(RequestDetails {
+ op: Operation::List,
+ path: prefix.cloned().unwrap_or_else(|| Path::from("")),
+ timestamp,
+ duration_nanos: Arc::clone(&duration_nanos),
+ size: None,
+ range: None,
+ extra_display: None,
+ });
- Box::pin(wrapped_stream)
+ Box::pin(TimeToFirstItemStream::new(inner_stream, duration_nanos))
}
async fn instrumented_list_with_delimiter(
@@ -329,7 +313,7 @@ impl InstrumentedObjectStore {
op: Operation::List,
path: prefix.cloned().unwrap_or_else(|| Path::from("")),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as
u64)),
size: None,
range: None,
extra_display: None,
@@ -348,7 +332,7 @@ impl InstrumentedObjectStore {
op: Operation::Copy,
path: from.clone(),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as
u64)),
size: None,
range: None,
extra_display: Some(format!("copy_to: {to}")),
@@ -371,7 +355,7 @@ impl InstrumentedObjectStore {
op: Operation::Copy,
path: from.clone(),
timestamp,
- duration: Some(elapsed),
+ duration_nanos: Arc::new(AtomicU64::new(elapsed.as_nanos() as
u64)),
size: None,
range: None,
extra_display: Some(format!("copy_to: {to}")),
@@ -494,17 +478,42 @@ impl fmt::Display for Operation {
}
/// Holds profiling details about individual requests made through an
[`InstrumentedObjectStore`]
-#[derive(Debug)]
pub struct RequestDetails {
op: Operation,
path: Path,
timestamp: chrono::DateTime<Utc>,
- duration: Option<Duration>,
+ /// Duration stored as nanoseconds in an AtomicU64. 0 means not yet set.
+ duration_nanos: Arc<AtomicU64>,
size: Option<usize>,
range: Option<GetRange>,
extra_display: Option<String>,
}
+impl fmt::Debug for RequestDetails {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("RequestDetails")
+ .field("op", &self.op)
+ .field("path", &self.path)
+ .field("timestamp", &self.timestamp)
+ .field("duration", &self.duration())
+ .field("size", &self.size)
+ .field("range", &self.range)
+ .field("extra_display", &self.extra_display)
+ .finish()
+ }
+}
+
+impl RequestDetails {
+ fn duration(&self) -> Option<Duration> {
+ let nanos = self.duration_nanos.load(Ordering::Acquire);
+ if nanos == 0 {
+ None
+ } else {
+ Some(Duration::from_nanos(nanos))
+ }
+ }
+}
+
impl fmt::Display for RequestDetails {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut output_parts = vec![format!(
@@ -513,7 +522,7 @@ impl fmt::Display for RequestDetails {
self.op
)];
- if let Some(d) = self.duration {
+ if let Some(d) = self.duration() {
output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
}
if let Some(s) = self.size {
@@ -700,7 +709,7 @@ impl RequestSummary {
}
fn push(&mut self, request: &RequestDetails) {
self.count += 1;
- if let Some(dur) = request.duration {
+ if let Some(dur) = request.duration() {
self.duration_stats.get_or_insert_default().push(dur)
}
if let Some(size) = request.size {
@@ -916,7 +925,7 @@ mod tests {
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Get);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert_eq!(request.size, Some(9));
assert_eq!(request.range, None);
assert!(request.extra_display.is_none());
@@ -945,7 +954,7 @@ mod tests {
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Delete);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
@@ -963,19 +972,57 @@ mod tests {
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let mut stream = instrumented.list(Some(&path));
- // Consume at least one item from the stream to trigger duration
measurement
+ // Sleep between stream creation and first poll to verify the timer
+ // starts on first poll, not at stream creation.
+ let delay = Duration::from_millis(50);
+ tokio::time::sleep(delay).await;
let _ = stream.next().await;
assert_eq!(instrumented.requests.lock().len(), 1);
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::List);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ let duration = request
+ .duration()
+ .expect("duration should be set after consuming stream");
+ assert!(
+ duration < delay,
+ "duration {duration:?} should exclude the {delay:?} sleep before
first poll"
+ );
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}
+ #[tokio::test]
+ async fn time_to_first_item_stream_captures_inner_latency() {
+ let inner_delay = Duration::from_millis(50);
+ let inner_stream = futures::stream::once(async move {
+ tokio::time::sleep(inner_delay).await;
+ Ok(ObjectMeta {
+ location: Path::from("test"),
+ last_modified: Utc::now(),
+ size: 0,
+ e_tag: None,
+ version: None,
+ })
+ })
+ .boxed();
+
+ let duration_nanos = Arc::new(AtomicU64::new(0));
+ let mut stream = Box::pin(TimeToFirstItemStream::new(
+ inner_stream,
+ Arc::clone(&duration_nanos),
+ ));
+ let _ = stream.next().await;
+
+ let recorded =
Duration::from_nanos(duration_nanos.load(Ordering::Acquire));
+ assert!(
+ recorded >= inner_delay,
+ "recorded duration {recorded:?} should be >= inner stream delay
{inner_delay:?}"
+ );
+ }
+
#[tokio::test]
async fn instrumented_store_list_with_delimiter() {
let (instrumented, path) = setup_test_store().await;
@@ -993,7 +1040,7 @@ mod tests {
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::List);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
@@ -1024,7 +1071,7 @@ mod tests {
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::Put);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert_eq!(request.size.unwrap(), size);
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
@@ -1059,7 +1106,7 @@ mod tests {
let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::Put);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
@@ -1087,7 +1134,7 @@ mod tests {
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Copy);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert_eq!(
@@ -1126,7 +1173,7 @@ mod tests {
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Copy);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert_eq!(
@@ -1156,7 +1203,7 @@ mod tests {
let request = requests.pop().unwrap();
assert_eq!(request.op, Operation::Head);
assert_eq!(request.path, path);
- assert!(request.duration.is_some());
+ assert!(request.duration().is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
@@ -1168,7 +1215,9 @@ mod tests {
op: Operation::Get,
path: Path::from("test"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
- duration: Some(Duration::new(5, 0)),
+ duration_nanos: Arc::new(AtomicU64::new(
+ Duration::new(5, 0).as_nanos() as u64
+ )),
size: Some(10),
range: Some((..10).into()),
extra_display: Some(String::from("extra info")),
@@ -1195,7 +1244,9 @@ mod tests {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
- duration: Some(Duration::from_secs(5)),
+ duration_nanos: Arc::new(AtomicU64::new(
+ Duration::from_secs(5).as_nanos() as u64
+ )),
size: Some(100),
range: None,
extra_display: None,
@@ -1215,7 +1266,9 @@ mod tests {
op: Operation::Get,
path: Path::from("test2"),
timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
- duration: Some(Duration::from_secs(8)),
+ duration_nanos: Arc::new(AtomicU64::new(
+ Duration::from_secs(8).as_nanos() as u64
+ )),
size: Some(150),
range: None,
extra_display: None,
@@ -1224,7 +1277,9 @@ mod tests {
op: Operation::Get,
path: Path::from("test3"),
timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
- duration: Some(Duration::from_secs(2)),
+ duration_nanos: Arc::new(AtomicU64::new(
+ Duration::from_secs(2).as_nanos() as u64
+ )),
size: Some(50),
range: None,
extra_display: None,
@@ -1243,7 +1298,9 @@ mod tests {
op: Operation::Put,
path: Path::from("test4"),
timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
- duration: Some(Duration::from_millis(200)),
+ duration_nanos: Arc::new(AtomicU64::new(
+ Duration::from_millis(200).as_nanos() as u64,
+ )),
size: Some(75),
range: None,
extra_display: None,
@@ -1268,7 +1325,9 @@ mod tests {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
- duration: Some(Duration::from_secs(3)),
+ duration_nanos: Arc::new(AtomicU64::new(
+ Duration::from_secs(3).as_nanos() as u64
+ )),
size: None,
range: None,
extra_display: None,
@@ -1290,7 +1349,7 @@ mod tests {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
- duration: None,
+ duration_nanos: Arc::new(AtomicU64::new(0)),
size: Some(200),
range: None,
extra_display: None,
@@ -1312,7 +1371,7 @@ mod tests {
op: Operation::Get,
path: Path::from("test1"),
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
- duration: None,
+ duration_nanos: Arc::new(AtomicU64::new(0)),
size: None,
range: None,
extra_display: None,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]