BlakeOrth commented on code in PR #19127:
URL: https://github.com/apache/datafusion/pull/19127#discussion_r2600527208


##########
datafusion-cli/src/object_storage/instrumented.rs:
##########
@@ -35,14 +35,66 @@ use datafusion::{
     error::DataFusionError,
     execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
 };
-use futures::stream::BoxStream;
+use futures::stream::{BoxStream, Stream};
 use object_store::{
     path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, 
ObjectMeta,
     ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, 
Result,
 };
 use parking_lot::{Mutex, RwLock};
 use url::Url;
 
+/// A stream wrapper that measures the time until the first response(item or 
end of stream) is yielded
+struct TimeToFirstItemStream<S> {
+    inner: S,
+    start: Instant,
+    request_index: usize,
+    requests: Arc<Mutex<Vec<RequestDetails>>>,
+    first_item_yielded: bool,
+}
+
+impl<S> TimeToFirstItemStream<S> {
+    fn new(
+        inner: S,
+        start: Instant,
+        request_index: usize,
+        requests: Arc<Mutex<Vec<RequestDetails>>>,
+    ) -> Self {
+        Self {
+            inner,
+            start,
+            request_index,
+            requests,
+            first_item_yielded: false,
+        }
+    }
+}
+
+impl<S> Stream for TimeToFirstItemStream<S>
+where
+    S: Stream<Item = Result<ObjectMeta>> + Unpin,
+{
+    type Item = Result<ObjectMeta>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
+
+        if !self.first_item_yielded && poll_result.is_ready() {
+            self.first_item_yielded = true;
+            let elapsed = self.start.elapsed();

Review Comment:
   I'm somewhat concerned this elapsed calculation could end up generating 
misleading results in some cases. The concern stems from the fact that 
self.start is set when the stream is created. While this will probably be 
pretty accurate in many scenarios, imagine a scenario where something like the 
following happens:
   
   ```rust
   let list_stream = TimeToFirstItemStream::new(stream, Instant::now(), 0, 
requests);
   // The stream is created here, but has never been polled because the user 
has yet
   // to await the stream. However, the "timer" is already running.
   some_long_running_method().await;
   let item = list_stream.next().await.unwrap();
   ```
   
   In this case the elapsed duration would effectively be measuring the time of 
both `some_long_running_method()` as well as the time it took to yield the 
first element on the stream.
   I'm wondering if we can set `self.start` once on the first call to 
`poll_next(...)` and then set elapsed on the first time an element hits 
`Poll::Ready` (as you've already done here) to get more accurate results.



##########
datafusion-cli/src/object_storage/instrumented.rs:
##########
@@ -35,14 +35,66 @@ use datafusion::{
     error::DataFusionError,
     execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
 };
-use futures::stream::BoxStream;
+use futures::stream::{BoxStream, Stream};
 use object_store::{
     path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, 
ObjectMeta,
     ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, 
Result,
 };
 use parking_lot::{Mutex, RwLock};
 use url::Url;
 
+/// A stream wrapper that measures the time until the first response(item or 
end of stream) is yielded
+struct TimeToFirstItemStream<S> {
+    inner: S,
+    start: Instant,
+    request_index: usize,
+    requests: Arc<Mutex<Vec<RequestDetails>>>,
+    first_item_yielded: bool,
+}
+
+impl<S> TimeToFirstItemStream<S> {
+    fn new(
+        inner: S,
+        start: Instant,
+        request_index: usize,
+        requests: Arc<Mutex<Vec<RequestDetails>>>,
+    ) -> Self {
+        Self {
+            inner,
+            start,
+            request_index,
+            requests,
+            first_item_yielded: false,
+        }
+    }
+}
+
+impl<S> Stream for TimeToFirstItemStream<S>
+where
+    S: Stream<Item = Result<ObjectMeta>> + Unpin,
+{
+    type Item = Result<ObjectMeta>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
+
+        if !self.first_item_yielded && poll_result.is_ready() {
+            self.first_item_yielded = true;
+            let elapsed = self.start.elapsed();
+
+            let mut requests = self.requests.lock();
+            if let Some(request) = requests.get_mut(self.request_index) {
+                request.duration = Some(elapsed);
+            }

Review Comment:
   Based on the current implementation I believe this strategy is currently 
"safe" (e.g. we won't accidentally modify the duration of a different request, 
leading to errant data). However, it does rely on the assumption that 
`self.requests` never has items removed from the middle of the `Vec`.
   
   It might be useful to find a place to leave a comment noting that `requests` 
should be append-only to make it less likely for this assumption to be broken 
in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to