This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch fix-mertics
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit d9ac078adfd721921a7ba3d657964222d1fc79ff
Author: Xuanwo <[email protected]>
AuthorDate: Wed Jun 5 20:13:30 2024 +0800

    fix(core/prometheus): Fix metrics from prometheus not correct for reader
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/layers/prometheus.rs        | 235 ++++++++++++------
 core/src/layers/prometheus_client.rs | 447 ++++++++++++++++++++++-------------
 2 files changed, 438 insertions(+), 244 deletions(-)

diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index bd300619f3..6c93ba1d38 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -20,7 +20,6 @@ use std::fmt::Formatter;
 use std::sync::Arc;
 
 use bytes::Buf;
-use futures::FutureExt;
 use futures::TryFutureExt;
 use log::debug;
 use prometheus::core::AtomicU64;
@@ -32,6 +31,7 @@ use prometheus::register_int_counter_vec_with_registry;
 use prometheus::HistogramVec;
 use prometheus::Registry;
 
+use crate::raw::oio::{ReadOperation, WriteOperation};
 use crate::raw::Access;
 use crate::raw::*;
 use crate::*;
@@ -314,9 +314,11 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
             .requests_duration_seconds
             .with_label_values(&labels)
             .start_timer();
+        let res = self.inner.read(path, args).await;
+        timer.observe_duration();
 
-        let read_res = self.inner.read(path, args).await.map(|(rp, r)| {
-            (
+        match res {
+            Ok((rp, r)) => Ok((
                 rp,
                 PrometheusMetricWrapper::new(
                     r,
@@ -325,13 +327,13 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
                     self.scheme,
                     &path.to_string(),
                 ),
-            )
-        });
-        timer.observe_duration();
-        read_res.map_err(|e| {
-            self.stats.increment_errors_total(Operation::Read, e.kind());
-            e
-        })
+            )),
+            Err(err) => {
+                self.stats
+                    .increment_errors_total(Operation::Read, err.kind());
+                Err(err)
+            }
+        }
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
@@ -347,31 +349,26 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
             .requests_duration_seconds
             .with_label_values(&labels)
             .start_timer();
-
-        let write_res = self
-            .inner
-            .write(path, args)
-            .map(|v| {
-                v.map(|(rp, r)| {
-                    (
-                        rp,
-                        PrometheusMetricWrapper::new(
-                            r,
-                            Operation::Write,
-                            self.stats.clone(),
-                            self.scheme,
-                            &path.to_string(),
-                        ),
-                    )
-                })
-            })
-            .await;
+        let res = self.inner.write(path, args).await;
         timer.observe_duration();
-        write_res.map_err(|e| {
-            self.stats
-                .increment_errors_total(Operation::Write, e.kind());
-            e
-        })
+
+        match res {
+            Ok((rp, w)) => Ok((
+                rp,
+                PrometheusMetricWrapper::new(
+                    w,
+                    Operation::Write,
+                    self.stats.clone(),
+                    self.scheme,
+                    &path.to_string(),
+                ),
+            )),
+            Err(err) => {
+                self.stats
+                    .increment_errors_total(Operation::Write, err.kind());
+                Err(err)
+            }
+        }
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
@@ -683,10 +680,19 @@ impl<R: oio::Read> oio::Read for 
PrometheusMetricWrapper<R> {
     async fn read(&mut self) -> Result<Buffer> {
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
-            Operation::Read.into_static(),
+            ReadOperation::Read.into_static(),
             &self.path,
         );
-        match self.inner.read().await {
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.read().await;
+        timer.observe_duration();
+
+        match res {
             Ok(bytes) => {
                 self.stats
                     .bytes_total
@@ -706,22 +712,31 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
     fn read(&mut self) -> Result<Buffer> {
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
-            Operation::BlockingRead.into_static(),
+            ReadOperation::BlockingRead.into_static(),
             &self.path,
         );
-        self.inner
-            .read()
-            .map(|bs| {
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.read();
+        timer.observe_duration();
+
+        match res {
+            Ok(bs) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
                     .observe(bs.remaining() as f64);
-                bs
-            })
-            .map_err(|e| {
-                self.stats.increment_errors_total(self.op, e.kind());
-                e
-            })
+                Ok(bs)
+            }
+            Err(err) => {
+                self.stats.increment_errors_total(self.op, err.kind());
+                Err(err)
+            }
+        }
     }
 }
 
@@ -729,37 +744,79 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
     async fn write(&mut self, bs: Buffer) -> Result<usize> {
         let labels = self.stats.generate_metric_label(
             self.scheme.into_static(),
-            Operation::Write.into_static(),
+            WriteOperation::Write.into_static(),
             &self.path,
         );
-        self.inner
-            .write(bs)
-            .await
-            .map(|n| {
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.write(bs).await;
+        timer.observe_duration();
+
+        match res {
+            Ok(n) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
                     .observe(n as f64);
-                n
-            })
-            .map_err(|err| {
+                Ok(n)
+            }
+            Err(err) => {
                 self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
+                Err(err)
+            }
+        }
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.inner.abort().await.map_err(|err| {
-            self.stats.increment_errors_total(self.op, err.kind());
-            err
-        })
+        let labels = self.stats.generate_metric_label(
+            self.scheme.into_static(),
+            WriteOperation::Abort.into_static(),
+            &self.path,
+        );
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.abort().await;
+        timer.observe_duration();
+
+        match res {
+            Ok(()) => Ok(()),
+            Err(err) => {
+                self.stats.increment_errors_total(self.op, err.kind());
+                Err(err)
+            }
+        }
     }
 
     async fn close(&mut self) -> Result<()> {
-        self.inner.close().await.map_err(|err| {
-            self.stats.increment_errors_total(self.op, err.kind());
-            err
-        })
+        let labels = self.stats.generate_metric_label(
+            self.scheme.into_static(),
+            WriteOperation::Close.into_static(),
+            &self.path,
+        );
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.close().await;
+        timer.observe_duration();
+
+        match res {
+            Ok(()) => Ok(()),
+            Err(err) => {
+                self.stats.increment_errors_total(self.op, err.kind());
+                Err(err)
+            }
+        }
     }
 }
 
@@ -770,26 +827,52 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for 
PrometheusMetricWrapper<R> {
             Operation::BlockingWrite.into_static(),
             &self.path,
         );
-        self.inner
-            .write(bs)
-            .map(|n| {
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.write(bs);
+        timer.observe_duration();
+
+        match res {
+            Ok(n) => {
                 self.stats
                     .bytes_total
                     .with_label_values(&labels)
                     .observe(n as f64);
-                n
-            })
-            .map_err(|err| {
+                Ok(n)
+            }
+            Err(err) => {
                 self.stats.increment_errors_total(self.op, err.kind());
-                err
-            })
+                Err(err)
+            }
+        }
     }
 
     fn close(&mut self) -> Result<()> {
-        self.inner.close().map_err(|err| {
-            self.stats.increment_errors_total(self.op, err.kind());
-            err
-        })
+        let labels = self.stats.generate_metric_label(
+            self.scheme.into_static(),
+            WriteOperation::BlockingClose.into_static(),
+            &self.path,
+        );
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&labels)
+            .start_timer();
+        let res = self.inner.close();
+        timer.observe_duration();
+
+        match res {
+            Ok(()) => Ok(()),
+            Err(err) => {
+                self.stats.increment_errors_total(self.op, err.kind());
+                Err(err)
+            }
+        }
     }
 }
 
diff --git a/core/src/layers/prometheus_client.rs 
b/core/src/layers/prometheus_client.rs
index e72b765216..a6a569005a 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -22,7 +22,6 @@ use std::time::Duration;
 use std::time::Instant;
 
 use bytes::Buf;
-use futures::FutureExt;
 use futures::TryFutureExt;
 use prometheus_client::metrics::counter::Counter;
 use prometheus_client::metrics::family::Family;
@@ -30,6 +29,7 @@ use prometheus_client::metrics::histogram;
 use prometheus_client::metrics::histogram::Histogram;
 use prometheus_client::registry::Registry;
 
+use crate::raw::oio::{ReadOperation, WriteOperation};
 use crate::raw::Access;
 use crate::raw::*;
 use crate::*;
@@ -159,30 +159,30 @@ impl PrometheusClientMetrics {
         }
     }
 
-    fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: 
ErrorKind) {
+    fn increment_errors_total(&self, scheme: Scheme, op: &'static str, err: 
ErrorKind) {
         let labels = [
             ("scheme", scheme.into_static()),
-            ("op", op.into_static()),
+            ("op", op),
             ("err", err.into_static()),
         ];
         self.errors_total.get_or_create(&labels).inc();
     }
 
-    fn increment_request_total(&self, scheme: Scheme, op: Operation) {
-        let labels = [("scheme", scheme.into_static()), ("op", 
op.into_static())];
+    fn increment_request_total(&self, scheme: Scheme, op: &'static str) {
+        let labels = [("scheme", scheme.into_static()), ("op", op)];
         self.requests_total.get_or_create(&labels).inc();
     }
 
-    fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) 
{
-        let labels = [("scheme", scheme.into_static()), ("op", 
op.into_static())];
+    fn observe_bytes_total(&self, scheme: Scheme, op: &'static str, bytes: 
usize) {
+        let labels = [("scheme", scheme.into_static()), ("op", op)];
         self.bytes_histogram
             .get_or_create(&labels)
             .observe(bytes as f64);
         self.bytes_total.get_or_create(&labels).inc_by(bytes as u64);
     }
 
-    fn observe_request_duration(&self, scheme: Scheme, op: Operation, 
duration: Duration) {
-        let labels = [("scheme", scheme.into_static()), ("op", 
op.into_static())];
+    fn observe_request_duration(&self, scheme: Scheme, op: &'static str, 
duration: Duration) {
+        let labels = [("scheme", scheme.into_static()), ("op", op)];
         self.request_duration_seconds
             .get_or_create(&labels)
             .observe(duration.as_secs_f64());
@@ -219,186 +219,221 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> 
{
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::CreateDir);
+            .increment_request_total(self.scheme, 
Operation::CreateDir.into_static());
 
         let start_time = Instant::now();
         let create_res = self.inner.create_dir(path, args).await;
 
         self.metrics.observe_request_duration(
             self.scheme,
-            Operation::CreateDir,
+            Operation::CreateDir.into_static(),
             start_time.elapsed(),
         );
         create_res.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::CreateDir, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::CreateDir.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let start = Instant::now();
+
+        let res = self.inner.read(path, args).await;
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::Read.into_static(),
+            start.elapsed(),
+        );
         self.metrics
-            .increment_request_total(self.scheme, Operation::Read);
+            .increment_request_total(self.scheme, 
Operation::Read.into_static());
 
-        let read_res = self
-            .inner
-            .read(path, args)
-            .map(|v| {
-                v.map(|(rp, r)| {
-                    (
-                        rp,
-                        PrometheusMetricWrapper::new(
-                            r,
-                            Operation::Read,
-                            self.metrics.clone(),
-                            self.scheme,
-                        ),
-                    )
-                })
-            })
-            .await;
-        read_res.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::Read, 
e.kind());
-            e
-        })
+        match res {
+            Ok((rp, r)) => Ok((
+                rp,
+                PrometheusMetricWrapper::new(r, self.metrics.clone(), 
self.scheme),
+            )),
+            Err(err) => {
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    Operation::Read.into_static(),
+                    err.kind(),
+                );
+                Err(err)
+            }
+        }
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let start = Instant::now();
+
         self.metrics
-            .increment_request_total(self.scheme, Operation::Write);
+            .increment_request_total(self.scheme, 
Operation::Write.into_static());
 
-        let write_res = self
-            .inner
-            .write(path, args)
-            .map(|v| {
-                v.map(|(rp, r)| {
-                    (
-                        rp,
-                        PrometheusMetricWrapper::new(
-                            r,
-                            Operation::Write,
-                            self.metrics.clone(),
-                            self.scheme,
-                        ),
-                    )
-                })
-            })
-            .await;
+        let res = self.inner.write(path, args).await;
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::Write.into_static(),
+            start.elapsed(),
+        );
+        self.metrics
+            .increment_request_total(self.scheme, 
Operation::Write.into_static());
 
-        write_res.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::Write, 
e.kind());
-            e
-        })
+        match res {
+            Ok((rp, w)) => Ok((
+                rp,
+                PrometheusMetricWrapper::new(w, self.metrics.clone(), 
self.scheme),
+            )),
+            Err(err) => {
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    Operation::Write.into_static(),
+                    err.kind(),
+                );
+                Err(err)
+            }
+        }
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::Stat);
+            .increment_request_total(self.scheme, 
Operation::Stat.into_static());
         let start_time = Instant::now();
 
         let stat_res = self
             .inner
             .stat(path, args)
             .inspect_err(|e| {
-                self.metrics
-                    .increment_errors_total(self.scheme, Operation::Stat, 
e.kind());
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    Operation::Stat.into_static(),
+                    e.kind(),
+                );
             })
             .await;
 
-        self.metrics
-            .observe_request_duration(self.scheme, Operation::Stat, 
start_time.elapsed());
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::Stat.into_static(),
+            start_time.elapsed(),
+        );
         stat_res.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::Stat, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::Stat.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::Delete);
+            .increment_request_total(self.scheme, 
Operation::Delete.into_static());
         let start_time = Instant::now();
 
         let delete_res = self.inner.delete(path, args).await;
 
-        self.metrics
-            .observe_request_duration(self.scheme, Operation::Delete, 
start_time.elapsed());
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::Delete.into_static(),
+            start_time.elapsed(),
+        );
         delete_res.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::Delete, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::Delete.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::List);
+            .increment_request_total(self.scheme, 
Operation::List.into_static());
         let start_time = Instant::now();
 
         let list_res = self.inner.list(path, args).await;
 
-        self.metrics
-            .observe_request_duration(self.scheme, Operation::List, 
start_time.elapsed());
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::List.into_static(),
+            start_time.elapsed(),
+        );
         list_res.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::List, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::List.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::Batch);
+            .increment_request_total(self.scheme, 
Operation::Batch.into_static());
         let start_time = Instant::now();
 
         let result = self.inner.batch(args).await;
 
-        self.metrics
-            .observe_request_duration(self.scheme, Operation::Batch, 
start_time.elapsed());
+        self.metrics.observe_request_duration(
+            self.scheme,
+            Operation::Batch.into_static(),
+            start_time.elapsed(),
+        );
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::Batch, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::Batch.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::Presign);
+            .increment_request_total(self.scheme, 
Operation::Presign.into_static());
         let start_time = Instant::now();
 
         let result = self.inner.presign(path, args).await;
 
         self.metrics.observe_request_duration(
             self.scheme,
-            Operation::Presign,
+            Operation::Presign.into_static(),
             start_time.elapsed(),
         );
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::Presign, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::Presign.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
         self.metrics
-            .increment_request_total(self.scheme, 
Operation::BlockingCreateDir);
+            .increment_request_total(self.scheme, 
Operation::BlockingCreateDir.into_static());
         let start_time = Instant::now();
 
         let result = self.inner.blocking_create_dir(path, args);
 
         self.metrics.observe_request_duration(
             self.scheme,
-            Operation::BlockingCreateDir,
+            Operation::BlockingCreateDir.into_static(),
             start_time.elapsed(),
         );
         result.map_err(|e| {
             self.metrics.increment_errors_total(
                 self.scheme,
-                Operation::BlockingCreateDir,
+                Operation::BlockingCreateDir.into_static(),
                 e.kind(),
             );
             e
@@ -407,103 +442,108 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> 
{
 
     fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::BlockingRead);
+            .increment_request_total(self.scheme, 
Operation::BlockingRead.into_static());
 
         let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
             (
                 rp,
-                PrometheusMetricWrapper::new(
-                    r,
-                    Operation::BlockingRead,
-                    self.metrics.clone(),
-                    self.scheme,
-                ),
+                PrometheusMetricWrapper::new(r, self.metrics.clone(), 
self.scheme),
             )
         });
 
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::BlockingRead, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::BlockingRead.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::BlockingWrite);
+            .increment_request_total(self.scheme, 
Operation::BlockingWrite.into_static());
 
         let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
             (
                 rp,
-                PrometheusMetricWrapper::new(
-                    r,
-                    Operation::BlockingWrite,
-                    self.metrics.clone(),
-                    self.scheme,
-                ),
+                PrometheusMetricWrapper::new(r, self.metrics.clone(), 
self.scheme),
             )
         });
 
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::BlockingWrite, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::BlockingWrite.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::BlockingStat);
+            .increment_request_total(self.scheme, 
Operation::BlockingStat.into_static());
         let start_time = Instant::now();
 
         let result = self.inner.blocking_stat(path, args);
         self.metrics.observe_request_duration(
             self.scheme,
-            Operation::BlockingStat,
+            Operation::BlockingStat.into_static(),
             start_time.elapsed(),
         );
 
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::BlockingStat, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::BlockingStat.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::BlockingDelete);
+            .increment_request_total(self.scheme, 
Operation::BlockingDelete.into_static());
         let start_time = Instant::now();
 
         let result = self.inner.blocking_delete(path, args);
 
         self.metrics.observe_request_duration(
             self.scheme,
-            Operation::BlockingDelete,
+            Operation::BlockingDelete.into_static(),
             start_time.elapsed(),
         );
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, 
Operation::BlockingDelete, e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::BlockingDelete.into_static(),
+                e.kind(),
+            );
             e
         })
     }
 
     fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingLister)> {
         self.metrics
-            .increment_request_total(self.scheme, Operation::BlockingList);
+            .increment_request_total(self.scheme, 
Operation::BlockingList.into_static());
         let start_time = Instant::now();
 
         let result = self.inner.blocking_list(path, args);
 
         self.metrics.observe_request_duration(
             self.scheme,
-            Operation::BlockingList,
+            Operation::BlockingList.into_static(),
             start_time.elapsed(),
         );
         result.map_err(|e| {
-            self.metrics
-                .increment_errors_total(self.scheme, Operation::BlockingList, 
e.kind());
+            self.metrics.increment_errors_total(
+                self.scheme,
+                Operation::BlockingList.into_static(),
+                e.kind(),
+            );
             e
         })
     }
@@ -512,22 +552,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
 pub struct PrometheusMetricWrapper<R> {
     inner: R,
 
-    op: Operation,
     metrics: Arc<PrometheusClientMetrics>,
     scheme: Scheme,
-    bytes_total: usize,
-    start_time: Instant,
 }
 
 impl<R> PrometheusMetricWrapper<R> {
-    fn new(inner: R, op: Operation, metrics: Arc<PrometheusClientMetrics>, 
scheme: Scheme) -> Self {
+    fn new(inner: R, metrics: Arc<PrometheusClientMetrics>, scheme: Scheme) -> 
Self {
         Self {
             inner,
-            op,
             metrics,
             scheme,
-            bytes_total: 0,
-            start_time: Instant::now(),
         }
     }
 }
@@ -538,15 +572,24 @@ impl<R: oio::Read> oio::Read for 
PrometheusMetricWrapper<R> {
 
         match self.inner.read().await {
             Ok(bs) => {
-                self.metrics
-                    .observe_bytes_total(self.scheme, self.op, bs.remaining());
-                self.metrics
-                    .observe_request_duration(self.scheme, self.op, 
start.elapsed());
+                self.metrics.observe_bytes_total(
+                    self.scheme,
+                    ReadOperation::Read.into_static(),
+                    bs.remaining(),
+                );
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    ReadOperation::Read.into_static(),
+                    start.elapsed(),
+                );
                 Ok(bs)
             }
             Err(e) => {
-                self.metrics
-                    .increment_errors_total(self.scheme, self.op, e.kind());
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    ReadOperation::Read.into_static(),
+                    e.kind(),
+                );
                 Err(e)
             }
         }
@@ -559,15 +602,24 @@ impl<R: oio::BlockingRead> oio::BlockingRead for 
PrometheusMetricWrapper<R> {
         self.inner
             .read()
             .map(|bs| {
-                self.metrics
-                    .observe_bytes_total(self.scheme, self.op, bs.remaining());
-                self.metrics
-                    .observe_request_duration(self.scheme, self.op, 
start.elapsed());
+                self.metrics.observe_bytes_total(
+                    self.scheme,
+                    ReadOperation::BlockingRead.into_static(),
+                    bs.remaining(),
+                );
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    ReadOperation::BlockingRead.into_static(),
+                    start.elapsed(),
+                );
                 bs
             })
             .map_err(|e| {
-                self.metrics
-                    .increment_errors_total(self.scheme, self.op, e.kind());
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    ReadOperation::BlockingRead.into_static(),
+                    e.kind(),
+                );
                 e
             })
     }
@@ -581,64 +633,123 @@ impl<R: oio::Write> oio::Write for 
PrometheusMetricWrapper<R> {
             .write(bs)
             .await
             .map(|n| {
-                self.metrics.observe_bytes_total(self.scheme, self.op, n);
-                self.metrics
-                    .observe_request_duration(self.scheme, self.op, 
start.elapsed());
+                self.metrics.observe_bytes_total(
+                    self.scheme,
+                    WriteOperation::Write.into_static(),
+                    n,
+                );
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    WriteOperation::Write.into_static(),
+                    start.elapsed(),
+                );
                 n
             })
             .map_err(|err| {
-                self.metrics
-                    .increment_errors_total(self.scheme, self.op, err.kind());
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    WriteOperation::Write.into_static(),
+                    err.kind(),
+                );
                 err
             })
     }
 
     async fn abort(&mut self) -> Result<()> {
-        self.inner.abort().await.map_err(|err| {
-            self.metrics
-                .increment_errors_total(self.scheme, self.op, err.kind());
-            err
-        })
+        let start = Instant::now();
+
+        self.inner
+            .abort()
+            .await
+            .map(|_| {
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    WriteOperation::Abort.into_static(),
+                    start.elapsed(),
+                );
+            })
+            .map_err(|err| {
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    WriteOperation::Abort.into_static(),
+                    err.kind(),
+                );
+                err
+            })
     }
 
     async fn close(&mut self) -> Result<()> {
-        self.inner.close().await.map_err(|err| {
-            self.metrics
-                .increment_errors_total(self.scheme, self.op, err.kind());
-            err
-        })
+        let start = Instant::now();
+
+        self.inner
+            .close()
+            .await
+            .map(|_| {
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    WriteOperation::Close.into_static(),
+                    start.elapsed(),
+                );
+            })
+            .map_err(|err| {
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    WriteOperation::Close.into_static(),
+                    err.kind(),
+                );
+                err
+            })
     }
 }
 
 impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
     fn write(&mut self, bs: Buffer) -> Result<usize> {
+        let start = Instant::now();
+
         self.inner
             .write(bs)
             .map(|n| {
-                self.bytes_total += n;
+                self.metrics.observe_bytes_total(
+                    self.scheme,
+                    WriteOperation::BlockingWrite.into_static(),
+                    n,
+                );
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    WriteOperation::BlockingWrite.into_static(),
+                    start.elapsed(),
+                );
                 n
             })
             .map_err(|err| {
-                self.metrics
-                    .increment_errors_total(self.scheme, self.op, err.kind());
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    WriteOperation::BlockingWrite.into_static(),
+                    err.kind(),
+                );
                 err
             })
     }
 
     fn close(&mut self) -> Result<()> {
-        self.inner.close().map_err(|err| {
-            self.metrics
-                .increment_errors_total(self.scheme, self.op, err.kind());
-            err
-        })
-    }
-}
+        let start = Instant::now();
 
-impl<R> Drop for PrometheusMetricWrapper<R> {
-    fn drop(&mut self) {
-        self.metrics
-            .observe_bytes_total(self.scheme, self.op, self.bytes_total);
-        self.metrics
-            .observe_request_duration(self.scheme, self.op, 
self.start_time.elapsed());
+        self.inner
+            .close()
+            .map(|_| {
+                self.metrics.observe_request_duration(
+                    self.scheme,
+                    WriteOperation::BlockingClose.into_static(),
+                    start.elapsed(),
+                );
+            })
+            .map_err(|err| {
+                self.metrics.increment_errors_total(
+                    self.scheme,
+                    WriteOperation::BlockingClose.into_static(),
+                    err.kind(),
+                );
+                err
+            })
     }
 }


Reply via email to