This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch fix-mertics in repository https://gitbox.apache.org/repos/asf/opendal.git
commit d9ac078adfd721921a7ba3d657964222d1fc79ff Author: Xuanwo <[email protected]> AuthorDate: Wed Jun 5 20:13:30 2024 +0800 fix(core/prometheus): Fix metrics from prometheus not correct for reader Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/prometheus.rs | 235 ++++++++++++------ core/src/layers/prometheus_client.rs | 447 ++++++++++++++++++++++------------- 2 files changed, 438 insertions(+), 244 deletions(-) diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index bd300619f3..6c93ba1d38 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -20,7 +20,6 @@ use std::fmt::Formatter; use std::sync::Arc; use bytes::Buf; -use futures::FutureExt; use futures::TryFutureExt; use log::debug; use prometheus::core::AtomicU64; @@ -32,6 +31,7 @@ use prometheus::register_int_counter_vec_with_registry; use prometheus::HistogramVec; use prometheus::Registry; +use crate::raw::oio::{ReadOperation, WriteOperation}; use crate::raw::Access; use crate::raw::*; use crate::*; @@ -314,9 +314,11 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> { .requests_duration_seconds .with_label_values(&labels) .start_timer(); + let res = self.inner.read(path, args).await; + timer.observe_duration(); - let read_res = self.inner.read(path, args).await.map(|(rp, r)| { - ( + match res { + Ok((rp, r)) => Ok(( rp, PrometheusMetricWrapper::new( r, @@ -325,13 +327,13 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> { self.scheme, &path.to_string(), ), - ) - }); - timer.observe_duration(); - read_res.map_err(|e| { - self.stats.increment_errors_total(Operation::Read, e.kind()); - e - }) + )), + Err(err) => { + self.stats + .increment_errors_total(Operation::Read, err.kind()); + Err(err) + } + } } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -347,31 +349,26 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> { .requests_duration_seconds .with_label_values(&labels) .start_timer(); - - let write_res = self - .inner - .write(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - ) - }) - }) - .await; + let res = self.inner.write(path, args).await; timer.observe_duration(); - write_res.map_err(|e| { - self.stats - .increment_errors_total(Operation::Write, e.kind()); - e - }) + + match res { + Ok((rp, w)) => Ok(( + rp, + PrometheusMetricWrapper::new( + w, + Operation::Write, + self.stats.clone(), + self.scheme, + &path.to_string(), + ), + )), + Err(err) => { + self.stats + .increment_errors_total(Operation::Write, err.kind()); + Err(err) + } + } } async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { @@ -683,10 +680,19 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> { async fn read(&mut self) -> Result<Buffer> { let labels = self.stats.generate_metric_label( self.scheme.into_static(), - Operation::Read.into_static(), + ReadOperation::Read.into_static(), &self.path, ); - match self.inner.read().await { + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.read().await; + timer.observe_duration(); + + match res { Ok(bytes) => { self.stats .bytes_total @@ -706,22 +712,31 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { fn read(&mut self) -> Result<Buffer> { let labels = self.stats.generate_metric_label( self.scheme.into_static(), - Operation::BlockingRead.into_static(), + ReadOperation::BlockingRead.into_static(), &self.path, ); - self.inner - .read() - .map(|bs| { + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.read(); + timer.observe_duration(); + + match res { + Ok(bs) => { self.stats .bytes_total .with_label_values(&labels) .observe(bs.remaining() as f64); - bs - }) - .map_err(|e| { - self.stats.increment_errors_total(self.op, e.kind()); - e - }) + Ok(bs) + } + Err(err) => { + self.stats.increment_errors_total(self.op, err.kind()); + Err(err) + } + } } } @@ -729,37 +744,79 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { async fn write(&mut self, bs: Buffer) -> Result<usize> { let labels = self.stats.generate_metric_label( self.scheme.into_static(), - Operation::Write.into_static(), + WriteOperation::Write.into_static(), &self.path, ); - self.inner - .write(bs) - .await - .map(|n| { + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.write(bs).await; + timer.observe_duration(); + + match res { + Ok(n) => { self.stats .bytes_total .with_label_values(&labels) .observe(n as f64); - n - }) - .map_err(|err| { + Ok(n) + } + Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); - err - }) + Err(err) + } + } } async fn abort(&mut self) -> Result<()> { - self.inner.abort().await.map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); - err - }) + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + WriteOperation::Abort.into_static(), + &self.path, + ); + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.abort().await; + timer.observe_duration(); + + match res { + Ok(()) => Ok(()), + Err(err) => { + self.stats.increment_errors_total(self.op, err.kind()); + Err(err) + } + } } async fn close(&mut self) -> Result<()> { - self.inner.close().await.map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); - err - }) + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + WriteOperation::Close.into_static(), + &self.path, + ); + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.close().await; + timer.observe_duration(); + + match res { + Ok(()) => Ok(()), + Err(err) => { + self.stats.increment_errors_total(self.op, err.kind()); + Err(err) + } + } } } @@ -770,26 +827,52 @@ impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { Operation::BlockingWrite.into_static(), &self.path, ); - self.inner - .write(bs) - .map(|n| { + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.write(bs); + timer.observe_duration(); + + match res { + Ok(n) => { self.stats .bytes_total .with_label_values(&labels) .observe(n as f64); - n - }) - .map_err(|err| { + Ok(n) + } + Err(err) => { self.stats.increment_errors_total(self.op, err.kind()); - err - }) + Err(err) + } + } } fn close(&mut self) -> Result<()> { - self.inner.close().map_err(|err| { - self.stats.increment_errors_total(self.op, err.kind()); - err - }) + let labels = self.stats.generate_metric_label( + self.scheme.into_static(), + WriteOperation::BlockingClose.into_static(), + &self.path, + ); + + let timer = self + .stats + .requests_duration_seconds + .with_label_values(&labels) + .start_timer(); + let res = self.inner.close(); + timer.observe_duration(); + + match res { + Ok(()) => Ok(()), + Err(err) => { + self.stats.increment_errors_total(self.op, err.kind()); + Err(err) + } + } } } diff --git a/core/src/layers/prometheus_client.rs b/core/src/layers/prometheus_client.rs index e72b765216..a6a569005a 100644 --- a/core/src/layers/prometheus_client.rs +++ b/core/src/layers/prometheus_client.rs @@ -22,7 +22,6 @@ use std::time::Duration; use std::time::Instant; use bytes::Buf; -use futures::FutureExt; use futures::TryFutureExt; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; @@ -30,6 +29,7 @@ use prometheus_client::metrics::histogram; use prometheus_client::metrics::histogram::Histogram; use prometheus_client::registry::Registry; +use crate::raw::oio::{ReadOperation, WriteOperation}; use crate::raw::Access; use crate::raw::*; use crate::*; @@ -159,30 +159,30 @@ impl PrometheusClientMetrics { } } - fn increment_errors_total(&self, scheme: Scheme, op: Operation, err: ErrorKind) { + fn increment_errors_total(&self, scheme: Scheme, op: &'static str, err: ErrorKind) { let labels = [ ("scheme", scheme.into_static()), - ("op", op.into_static()), + ("op", op), ("err", err.into_static()), ]; self.errors_total.get_or_create(&labels).inc(); } - fn increment_request_total(&self, scheme: Scheme, op: Operation) { - let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; + fn increment_request_total(&self, scheme: Scheme, op: &'static str) { + let labels = [("scheme", scheme.into_static()), ("op", op)]; self.requests_total.get_or_create(&labels).inc(); } - fn observe_bytes_total(&self, scheme: Scheme, op: Operation, bytes: usize) { - let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; + fn observe_bytes_total(&self, scheme: Scheme, op: &'static str, bytes: usize) { + let labels = [("scheme", scheme.into_static()), ("op", op)]; self.bytes_histogram .get_or_create(&labels) .observe(bytes as f64); self.bytes_total.get_or_create(&labels).inc_by(bytes as u64); } - fn observe_request_duration(&self, scheme: Scheme, op: Operation, duration: Duration) { - let labels = [("scheme", scheme.into_static()), ("op", op.into_static())]; + fn observe_request_duration(&self, scheme: Scheme, op: &'static str, duration: Duration) { + let labels = [("scheme", scheme.into_static()), ("op", op)]; self.request_duration_seconds .get_or_create(&labels) .observe(duration.as_secs_f64()); @@ -219,186 +219,221 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> { async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { self.metrics - .increment_request_total(self.scheme, Operation::CreateDir); + .increment_request_total(self.scheme, Operation::CreateDir.into_static()); let start_time = Instant::now(); let create_res = self.inner.create_dir(path, args).await; self.metrics.observe_request_duration( self.scheme, - Operation::CreateDir, + Operation::CreateDir.into_static(), start_time.elapsed(), ); create_res.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::CreateDir, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::CreateDir.into_static(), + e.kind(), + ); e }) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let start = Instant::now(); + + let res = self.inner.read(path, args).await; + self.metrics.observe_request_duration( + self.scheme, + Operation::Read.into_static(), + start.elapsed(), + ); self.metrics - .increment_request_total(self.scheme, Operation::Read); + .increment_request_total(self.scheme, Operation::Read.into_static()); - let read_res = self - .inner - .read(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - self.metrics.clone(), - self.scheme, - ), - ) - }) - }) - .await; - read_res.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Read, e.kind()); - e - }) + match res { + Ok((rp, r)) => Ok(( + rp, + PrometheusMetricWrapper::new(r, self.metrics.clone(), self.scheme), + )), + Err(err) => { + self.metrics.increment_errors_total( + self.scheme, + Operation::Read.into_static(), + err.kind(), + ); + Err(err) + } + } } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let start = Instant::now(); + self.metrics - .increment_request_total(self.scheme, Operation::Write); + .increment_request_total(self.scheme, Operation::Write.into_static()); - let write_res = self - .inner - .write(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - self.metrics.clone(), - self.scheme, - ), - ) - }) - }) - .await; + let res = self.inner.write(path, args).await; + self.metrics.observe_request_duration( + self.scheme, + Operation::Write.into_static(), + start.elapsed(), + ); + self.metrics + .increment_request_total(self.scheme, Operation::Write.into_static()); - write_res.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Write, e.kind()); - e - }) + match res { + Ok((rp, w)) => Ok(( + rp, + PrometheusMetricWrapper::new(w, self.metrics.clone(), self.scheme), + )), + Err(err) => { + self.metrics.increment_errors_total( + self.scheme, + Operation::Write.into_static(), + err.kind(), + ); + Err(err) + } + } } async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { self.metrics - .increment_request_total(self.scheme, Operation::Stat); + .increment_request_total(self.scheme, Operation::Stat.into_static()); let start_time = Instant::now(); let stat_res = self .inner .stat(path, args) .inspect_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Stat, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::Stat.into_static(), + e.kind(), + ); }) .await; - self.metrics - .observe_request_duration(self.scheme, Operation::Stat, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::Stat.into_static(), + start_time.elapsed(), + ); stat_res.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Stat, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::Stat.into_static(), + e.kind(), + ); e }) } async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> { self.metrics - .increment_request_total(self.scheme, Operation::Delete); + .increment_request_total(self.scheme, Operation::Delete.into_static()); let start_time = Instant::now(); let delete_res = self.inner.delete(path, args).await; - self.metrics - .observe_request_duration(self.scheme, Operation::Delete, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::Delete.into_static(), + start_time.elapsed(), + ); delete_res.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Delete, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::Delete.into_static(), + e.kind(), + ); e }) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { self.metrics - .increment_request_total(self.scheme, Operation::List); + .increment_request_total(self.scheme, Operation::List.into_static()); let start_time = Instant::now(); let list_res = self.inner.list(path, args).await; - self.metrics - .observe_request_duration(self.scheme, Operation::List, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::List.into_static(), + start_time.elapsed(), + ); list_res.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::List, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::List.into_static(), + e.kind(), + ); e }) } async fn batch(&self, args: OpBatch) -> Result<RpBatch> { self.metrics - .increment_request_total(self.scheme, Operation::Batch); + .increment_request_total(self.scheme, Operation::Batch.into_static()); let start_time = Instant::now(); let result = self.inner.batch(args).await; - self.metrics - .observe_request_duration(self.scheme, Operation::Batch, start_time.elapsed()); + self.metrics.observe_request_duration( + self.scheme, + Operation::Batch.into_static(), + start_time.elapsed(), + ); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Batch, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::Batch.into_static(), + e.kind(), + ); e }) } async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> { self.metrics - .increment_request_total(self.scheme, Operation::Presign); + .increment_request_total(self.scheme, Operation::Presign.into_static()); let start_time = Instant::now(); let result = self.inner.presign(path, args).await; self.metrics.observe_request_duration( self.scheme, - Operation::Presign, + Operation::Presign.into_static(), start_time.elapsed(), ); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::Presign, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::Presign.into_static(), + e.kind(), + ); e }) } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { self.metrics - .increment_request_total(self.scheme, Operation::BlockingCreateDir); + .increment_request_total(self.scheme, Operation::BlockingCreateDir.into_static()); let start_time = Instant::now(); let result = self.inner.blocking_create_dir(path, args); self.metrics.observe_request_duration( self.scheme, - Operation::BlockingCreateDir, + Operation::BlockingCreateDir.into_static(), start_time.elapsed(), ); result.map_err(|e| { self.metrics.increment_errors_total( self.scheme, - Operation::BlockingCreateDir, + Operation::BlockingCreateDir.into_static(), e.kind(), ); e @@ -407,103 +442,108 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> { fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.metrics - .increment_request_total(self.scheme, Operation::BlockingRead); + .increment_request_total(self.scheme, Operation::BlockingRead.into_static()); let result = self.inner.blocking_read(path, args).map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingRead, - self.metrics.clone(), - self.scheme, - ), + PrometheusMetricWrapper::new(r, self.metrics.clone(), self.scheme), ) }); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingRead, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingRead.into_static(), + e.kind(), + ); e }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { self.metrics - .increment_request_total(self.scheme, Operation::BlockingWrite); + .increment_request_total(self.scheme, Operation::BlockingWrite.into_static()); let result = self.inner.blocking_write(path, args).map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingWrite, - self.metrics.clone(), - self.scheme, - ), + PrometheusMetricWrapper::new(r, self.metrics.clone(), self.scheme), ) }); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingWrite, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingWrite.into_static(), + e.kind(), + ); e }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> { self.metrics - .increment_request_total(self.scheme, Operation::BlockingStat); + .increment_request_total(self.scheme, Operation::BlockingStat.into_static()); let start_time = Instant::now(); let result = self.inner.blocking_stat(path, args); self.metrics.observe_request_duration( self.scheme, - Operation::BlockingStat, + Operation::BlockingStat.into_static(), start_time.elapsed(), ); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingStat, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingStat.into_static(), + e.kind(), + ); e }) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> { self.metrics - .increment_request_total(self.scheme, Operation::BlockingDelete); + .increment_request_total(self.scheme, Operation::BlockingDelete.into_static()); let start_time = Instant::now(); let result = self.inner.blocking_delete(path, args); self.metrics.observe_request_duration( self.scheme, - Operation::BlockingDelete, + Operation::BlockingDelete.into_static(), start_time.elapsed(), ); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingDelete, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingDelete.into_static(), + e.kind(), + ); e }) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { self.metrics - .increment_request_total(self.scheme, Operation::BlockingList); + .increment_request_total(self.scheme, Operation::BlockingList.into_static()); let start_time = Instant::now(); let result = self.inner.blocking_list(path, args); self.metrics.observe_request_duration( self.scheme, - Operation::BlockingList, + Operation::BlockingList.into_static(), start_time.elapsed(), ); result.map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, Operation::BlockingList, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + Operation::BlockingList.into_static(), + e.kind(), + ); e }) } @@ -512,22 +552,16 @@ impl<A: Access> LayeredAccess for PrometheusAccessor<A> { pub struct PrometheusMetricWrapper<R> { inner: R, - op: Operation, metrics: Arc<PrometheusClientMetrics>, scheme: Scheme, - bytes_total: usize, - start_time: Instant, } impl<R> PrometheusMetricWrapper<R> { - fn new(inner: R, op: Operation, metrics: Arc<PrometheusClientMetrics>, scheme: Scheme) -> Self { + fn new(inner: R, metrics: Arc<PrometheusClientMetrics>, scheme: Scheme) -> Self { Self { inner, - op, metrics, scheme, - bytes_total: 0, - start_time: Instant::now(), } } } @@ -538,15 +572,24 @@ impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> { match self.inner.read().await { Ok(bs) => { - self.metrics - .observe_bytes_total(self.scheme, self.op, bs.remaining()); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); + self.metrics.observe_bytes_total( + self.scheme, + ReadOperation::Read.into_static(), + bs.remaining(), + ); + self.metrics.observe_request_duration( + self.scheme, + ReadOperation::Read.into_static(), + start.elapsed(), + ); Ok(bs) } Err(e) => { - self.metrics - .increment_errors_total(self.scheme, self.op, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + ReadOperation::Read.into_static(), + e.kind(), + ); Err(e) } } @@ -559,15 +602,24 @@ impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> { self.inner .read() .map(|bs| { - self.metrics - .observe_bytes_total(self.scheme, self.op, bs.remaining()); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); + self.metrics.observe_bytes_total( + self.scheme, + ReadOperation::BlockingRead.into_static(), + bs.remaining(), + ); + self.metrics.observe_request_duration( + self.scheme, + ReadOperation::BlockingRead.into_static(), + start.elapsed(), + ); bs }) .map_err(|e| { - self.metrics - .increment_errors_total(self.scheme, self.op, e.kind()); + self.metrics.increment_errors_total( + self.scheme, + ReadOperation::BlockingRead.into_static(), + e.kind(), + ); e }) } @@ -581,64 +633,123 @@ impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> { .write(bs) .await .map(|n| { - self.metrics.observe_bytes_total(self.scheme, self.op, n); - self.metrics - .observe_request_duration(self.scheme, self.op, start.elapsed()); + self.metrics.observe_bytes_total( + self.scheme, + WriteOperation::Write.into_static(), + n, + ); + self.metrics.observe_request_duration( + self.scheme, + WriteOperation::Write.into_static(), + start.elapsed(), + ); n }) .map_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); + self.metrics.increment_errors_total( + self.scheme, + WriteOperation::Write.into_static(), + err.kind(), + ); err }) } async fn abort(&mut self) -> Result<()> { - self.inner.abort().await.map_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - err - }) + let start = Instant::now(); + + self.inner + .abort() + .await + .map(|_| { + self.metrics.observe_request_duration( + self.scheme, + WriteOperation::Abort.into_static(), + start.elapsed(), + ); + }) + .map_err(|err| { + self.metrics.increment_errors_total( + self.scheme, + WriteOperation::Abort.into_static(), + err.kind(), + ); + err + }) } async fn close(&mut self) -> Result<()> { - self.inner.close().await.map_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - err - }) + let start = Instant::now(); + + self.inner + .close() + .await + .map(|_| { + self.metrics.observe_request_duration( + self.scheme, + WriteOperation::Close.into_static(), + start.elapsed(), + ); + }) + .map_err(|err| { + self.metrics.increment_errors_total( + self.scheme, + WriteOperation::Close.into_static(), + err.kind(), + ); + err + }) } } impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> { fn write(&mut self, bs: Buffer) -> Result<usize> { + let start = Instant::now(); + self.inner .write(bs) .map(|n| { - self.bytes_total += n; + self.metrics.observe_bytes_total( + self.scheme, + WriteOperation::BlockingWrite.into_static(), + n, + ); + self.metrics.observe_request_duration( + self.scheme, + WriteOperation::BlockingWrite.into_static(), + start.elapsed(), + ); n }) .map_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); + self.metrics.increment_errors_total( + self.scheme, + WriteOperation::BlockingWrite.into_static(), + err.kind(), + ); err }) } fn close(&mut self) -> Result<()> { - self.inner.close().map_err(|err| { - self.metrics - .increment_errors_total(self.scheme, self.op, err.kind()); - err - }) - } -} + let start = Instant::now(); -impl<R> Drop for PrometheusMetricWrapper<R> { - fn drop(&mut self) { - self.metrics - .observe_bytes_total(self.scheme, self.op, self.bytes_total); - self.metrics - .observe_request_duration(self.scheme, self.op, self.start_time.elapsed()); + self.inner + .close() + .map(|_| { + self.metrics.observe_request_duration( + self.scheme, + WriteOperation::BlockingClose.into_static(), + start.elapsed(), + ); + }) + .map_err(|err| { + self.metrics.increment_errors_total( + self.scheme, + WriteOperation::BlockingClose.into_static(), + err.kind(), + ); + err + }) } }
