This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new dd02e29086 refactor(layers/metrics): rewrite metrics layer using
observe layer (#5098)
dd02e29086 is described below
commit dd02e2908636392df7c4646b1ad4dc747045ec2d
Author: Qinxuan Chen <[email protected]>
AuthorDate: Thu Sep 5 19:34:04 2024 +0800
refactor(layers/metrics): rewrite metrics layer using observe layer (#5098)
* refactor(layers/metrics): rewrite metrics layer using observe layer
* update
* make clippy happy
---
core/src/layers/metrics.rs | 882 +++++++--------------------------------------
1 file changed, 123 insertions(+), 759 deletions(-)
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index f051df08e7..c7fc75fa1f 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -15,63 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-use std::fmt::Debug;
-use std::fmt::Formatter;
use std::sync::Arc;
-use std::time::Instant;
+use std::time::Duration;
-use bytes::Buf;
-use futures::FutureExt;
-use futures::TryFutureExt;
use metrics::counter;
use metrics::histogram;
-use metrics::Counter;
-use metrics::Histogram;
+use metrics::Label;
+use crate::layers::observe;
use crate::raw::*;
use crate::*;
-/// requests_total records all successful requests sent via operator.
-static METRIC_REQUESTS_TOTAL: &str = "opendal_requests_total";
-/// requests_duration_seconds records the duration seconds of successful
-/// requests.
-///
-/// # NOTE
-///
-/// this metric will track the whole lifetime of this request:
-///
-/// - Building request
-/// - Sending request
-/// - Receiving response
-/// - Consuming response
-static METRIC_REQUESTS_DURATION_SECONDS: &str =
"opendal_requests_duration_seconds";
-static METRICS_ERRORS_TOTAL: &str = "opendal_errors_total";
-/// bytes_total records all bytes processed by operator.
-static METRIC_BYTES_TOTAL: &str = "opendal_bytes_total";
-
-/// The scheme of the service.
-static LABEL_SERVICE: &str = "service";
-/// The operation of this request.
-static LABEL_OPERATION: &str = "operation";
-/// The error kind of this failed request.
-static LABEL_ERROR: &str = "error";
-
/// Add [metrics](https://docs.rs/metrics/) for every operation.
///
/// # Metrics
///
-/// - `opendal_requests_total`: Total request numbers.
-/// - `opendal_requests_duration_seconds`: Request duration seconds.
-/// - `opendal_errors_total`: Total error numbers.
-/// - `opendal_bytes_total`: bytes read/write from/to underlying storage.
-///
-/// # Labels
-///
-/// metrics will carry the following labels
-///
-/// - `service`: Service name from [`Scheme`]
-/// - `operation`: Operation name from [`Operation`]
-/// - `error`: [`ErrorKind`] received by requests
+/// We provide several metrics, please see the documentation of [`observe`]
module.
///
/// # Notes
///
@@ -82,15 +41,17 @@ static LABEL_ERROR: &str = "error";
/// # Examples
///
/// ```no_run
-/// use anyhow::Result;
-/// use opendal::layers::MetricsLayer;
-/// use opendal::services;
-/// use opendal::Operator;
-///
-/// let _ = Operator::new(services::Memory::default())
-/// .expect("must init")
-/// .layer(MetricsLayer)
-/// .finish();
+/// # use anyhow::Result;
+/// # use opendal::layers::MetricsLayer;
+/// # use opendal::services;
+/// # use opendal::Operator;
+///
+/// # fn main() -> Result<()> {
+/// let _ = Operator::new(services::Memory::default())?
+/// .layer(MetricsLayer::default())
+/// .finish();
+/// Ok(())
+/// # }
/// ```
///
/// # Output
@@ -108,735 +69,138 @@ static LABEL_ERROR: &str = "error";
/// let (recorder, exporter) = builder.build().expect("failed to build
recorder/exporter");
/// let recorder = builder.build_recorder().expect("failed to build recorder");
/// ```
-#[derive(Debug, Copy, Clone)]
-pub struct MetricsLayer;
-
-impl<A: Access> Layer<A> for MetricsLayer {
- type LayeredAccess = MetricsAccessor<A>;
-
- fn layer(&self, inner: A) -> Self::LayeredAccess {
- let meta = inner.info();
-
- MetricsAccessor {
- inner,
- handle: Arc::new(MetricsHandler::new(meta.scheme().into_static())),
- }
- }
-}
-
-/// metrics will hold all metrics handlers in a `RwLock<HashMap>`.
-///
-/// By holding all metrics handlers we needed, we can reduce the lock
-/// cost on fetching them. All metrics update will be atomic operations.
-struct MetricsHandler {
- service: &'static str,
-
- requests_total_metadata: Counter,
- requests_duration_seconds_metadata: Histogram,
-
- requests_total_create: Counter,
- requests_duration_seconds_create: Histogram,
-
- requests_total_read: Counter,
- requests_duration_seconds_read: Histogram,
- bytes_total_read: Counter,
-
- requests_total_write: Counter,
- requests_duration_seconds_write: Histogram,
- bytes_total_write: Counter,
-
- requests_total_stat: Counter,
- requests_duration_seconds_stat: Histogram,
-
- requests_total_delete: Counter,
- requests_duration_seconds_delete: Histogram,
-
- requests_total_list: Counter,
- requests_duration_seconds_list: Histogram,
-
- requests_total_presign: Counter,
- requests_duration_seconds_presign: Histogram,
-
- requests_total_batch: Counter,
- requests_duration_seconds_batch: Histogram,
-
- requests_total_blocking_create: Counter,
- requests_duration_seconds_blocking_create: Histogram,
-
- requests_total_blocking_read: Counter,
- requests_duration_seconds_blocking_read: Histogram,
- bytes_total_blocking_read: Counter,
-
- requests_total_blocking_write: Counter,
- requests_duration_seconds_blocking_write: Histogram,
- #[allow(dead_code)]
- bytes_total_blocking_write: Counter,
-
- requests_total_blocking_stat: Counter,
- requests_duration_seconds_blocking_stat: Histogram,
-
- requests_total_blocking_delete: Counter,
- requests_duration_seconds_blocking_delete: Histogram,
-
- requests_total_blocking_list: Counter,
- requests_duration_seconds_blocking_list: Histogram,
+#[derive(Clone, Debug, Default)]
+pub struct MetricsLayer {
+ path_label_level: usize,
}
-impl MetricsHandler {
- fn new(service: &'static str) -> Self {
- Self {
- service,
-
- requests_total_metadata: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Info.into_static(),
- ),
- requests_duration_seconds_metadata: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Info.into_static(),
- ),
-
- requests_total_create: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::CreateDir.into_static(),
- ),
- requests_duration_seconds_create: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::CreateDir.into_static(),
- ),
-
- requests_total_read: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Read.into_static(),
- ),
- requests_duration_seconds_read: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Read.into_static(),
- ),
- bytes_total_read: counter!(
- METRIC_BYTES_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Read.into_static(),
- ),
-
- requests_total_write: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Write.into_static(),
- ),
- requests_duration_seconds_write: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Write.into_static(),
- ),
- bytes_total_write: counter!(
- METRIC_BYTES_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Write.into_static(),
- ),
-
- requests_total_stat: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Stat.into_static(),
- ),
- requests_duration_seconds_stat: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Stat.into_static(),
- ),
-
- requests_total_delete: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Delete.into_static(),
- ),
- requests_duration_seconds_delete: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Delete.into_static(),
- ),
-
- requests_total_list: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::List.into_static(),
- ),
- requests_duration_seconds_list: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::List.into_static(),
- ),
-
- requests_total_presign: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Presign.into_static(),
- ),
- requests_duration_seconds_presign: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Presign.into_static(),
- ),
-
- requests_total_batch: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Batch.into_static(),
- ),
- requests_duration_seconds_batch: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::Batch.into_static(),
- ),
-
- requests_total_blocking_create: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingCreateDir.into_static(),
- ),
- requests_duration_seconds_blocking_create: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingCreateDir.into_static(),
- ),
-
- requests_total_blocking_read: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingRead.into_static(),
- ),
- requests_duration_seconds_blocking_read: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingRead.into_static(),
- ),
- bytes_total_blocking_read: counter!(
- METRIC_BYTES_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingRead.into_static(),
- ),
-
- requests_total_blocking_write: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingWrite.into_static(),
- ),
- requests_duration_seconds_blocking_write: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingWrite.into_static(),
- ),
- bytes_total_blocking_write: counter!(
- METRIC_BYTES_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingWrite.into_static(),
- ),
-
- requests_total_blocking_stat: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingStat.into_static(),
- ),
- requests_duration_seconds_blocking_stat: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingStat.into_static(),
- ),
-
- requests_total_blocking_delete: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingDelete.into_static(),
- ),
- requests_duration_seconds_blocking_delete: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingDelete.into_static(),
- ),
-
- requests_total_blocking_list: counter!(
- METRIC_REQUESTS_TOTAL,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingList.into_static(),
- ),
- requests_duration_seconds_blocking_list: histogram!(
- METRIC_REQUESTS_DURATION_SECONDS,
- LABEL_SERVICE => service,
- LABEL_OPERATION => Operation::BlockingList.into_static(),
- ),
- }
- }
-
- /// error handling is the cold path, so we will not init error counters
- /// in advance.
- #[inline]
- fn increment_errors_total(&self, op: Operation, kind: ErrorKind) {
- counter!(METRICS_ERRORS_TOTAL,
- LABEL_SERVICE => self.service,
- LABEL_OPERATION => op.into_static(),
- LABEL_ERROR => kind.into_static(),
- )
- .increment(1)
- }
-}
-
-#[derive(Clone)]
-pub struct MetricsAccessor<A: Access> {
- inner: A,
- handle: Arc<MetricsHandler>,
-}
-
-impl<A: Access> Debug for MetricsAccessor<A> {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("MetricsAccessor")
- .field("inner", &self.inner)
- .finish_non_exhaustive()
+impl MetricsLayer {
+ /// Set the level of path label.
+ ///
+ /// - level = 0: we will ignore the path label.
+ /// - level > 0: the path label will be the path split by "/" and get the
last n level,
+ /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/"
as the path label.
+ pub fn path_label(mut self, level: usize) -> Self {
+ self.path_label_level = level;
+ self
}
}
-impl<A: Access> LayeredAccess for MetricsAccessor<A> {
- type Inner = A;
- type Reader = MetricWrapper<A::Reader>;
- type BlockingReader = MetricWrapper<A::BlockingReader>;
- type Writer = MetricWrapper<A::Writer>;
- type BlockingWriter = MetricWrapper<A::BlockingWriter>;
- type Lister = A::Lister;
- type BlockingLister = A::BlockingLister;
-
- fn inner(&self) -> &Self::Inner {
- &self.inner
- }
-
- fn metadata(&self) -> Arc<AccessorInfo> {
- self.handle.requests_total_metadata.increment(1);
-
- let start = Instant::now();
- let result = self.inner.info();
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_metadata.record(dur);
-
- result
- }
-
- async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- self.handle.requests_total_create.increment(1);
-
- let start = Instant::now();
-
- self.inner
- .create_dir(path, args)
- .map(|v| {
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_create.record(dur);
-
- v.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::CreateDir,
e.kind());
- e
- })
- })
- .await
- }
-
- async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- self.handle.requests_total_read.increment(1);
-
- let _start = Instant::now();
-
- self.inner
- .read(path, args)
- .map(|v| {
- v.map(|(rp, r)| {
- (
- rp,
- MetricWrapper::new(
- r,
- Operation::Read,
- self.handle.clone(),
- self.handle.bytes_total_read.clone(),
- self.handle.requests_duration_seconds_read.clone(),
- ),
- )
- })
- .map_err(|err| {
- self.handle
- .increment_errors_total(Operation::Read, err.kind());
- err
- })
- })
- .await
- }
-
- async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- self.handle.requests_total_write.increment(1);
-
- let _start = Instant::now();
-
- self.inner
- .write(path, args)
- .map_ok(|(rp, w)| {
- (
- rp,
- MetricWrapper::new(
- w,
- Operation::Write,
- self.handle.clone(),
- self.handle.bytes_total_write.clone(),
- self.handle.requests_duration_seconds_write.clone(),
- ),
- )
- })
- .inspect_err(|e| {
- self.handle
- .increment_errors_total(Operation::Write, e.kind());
- })
- .await
- }
-
- async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- self.handle.requests_total_stat.increment(1);
-
- let start = Instant::now();
-
- self.inner
- .stat(path, args)
- .inspect_ok(|_| {
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_stat.record(dur);
- })
- .inspect_err(|e| {
- self.handle
- .increment_errors_total(Operation::Stat, e.kind());
- })
- .await
- }
-
- async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
- self.handle.requests_total_delete.increment(1);
-
- let start = Instant::now();
-
- self.inner
- .delete(path, args)
- .inspect_ok(|_| {
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_delete.record(dur);
- })
- .inspect_err(|e| {
- self.handle
- .increment_errors_total(Operation::Delete, e.kind());
- })
- .await
- }
-
- async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- self.handle.requests_total_list.increment(1);
-
- let start = Instant::now();
-
- self.inner
- .list(path, args)
- .inspect_ok(|_| {
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_list.record(dur);
- })
- .inspect_err(|e| {
- self.handle
- .increment_errors_total(Operation::List, e.kind());
- })
- .await
- }
-
- async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
- self.handle.requests_total_batch.increment(1);
-
- let start = Instant::now();
- let result = self.inner.batch(args).await;
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_batch.record(dur);
-
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::Batch, e.kind());
- e
- })
- }
-
- async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
- self.handle.requests_total_presign.increment(1);
-
- let start = Instant::now();
- let result = self.inner.presign(path, args).await;
- let dur = start.elapsed().as_secs_f64();
-
- self.handle.requests_duration_seconds_presign.record(dur);
-
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::Presign, e.kind());
- e
- })
- }
-
- fn blocking_create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- self.handle.requests_total_blocking_create.increment(1);
-
- let start = Instant::now();
- let result = self.inner.blocking_create_dir(path, args);
- let dur = start.elapsed().as_secs_f64();
-
- self.handle
- .requests_duration_seconds_blocking_create
- .record(dur);
-
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::BlockingCreateDir,
e.kind());
- e
- })
- }
-
- fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
- self.handle.requests_total_blocking_read.increment(1);
-
- let _start = Instant::now();
- let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
- (
- rp,
- MetricWrapper::new(
- r,
- Operation::BlockingRead,
- self.handle.clone(),
- self.handle.bytes_total_blocking_read.clone(),
-
self.handle.requests_duration_seconds_blocking_read.clone(),
- ),
- )
- });
-
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::BlockingRead, e.kind());
- e
- })
- }
-
- fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
- self.handle.requests_total_blocking_write.increment(1);
-
- let start = Instant::now();
- let result = self.inner.blocking_write(path, args);
- let dur = start.elapsed().as_secs_f64();
-
- self.handle
- .requests_duration_seconds_blocking_write
- .record(dur);
-
- result
- .map(|(rp, w)| {
- (
- rp,
- MetricWrapper::new(
- w,
- Operation::BlockingWrite,
- self.handle.clone(),
- self.handle.bytes_total_write.clone(),
- self.handle.requests_duration_seconds_write.clone(),
- ),
- )
- })
- .map_err(|e| {
- self.handle
- .increment_errors_total(Operation::BlockingWrite,
e.kind());
- e
- })
- }
-
- fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- self.handle.requests_total_blocking_stat.increment(1);
-
- let start = Instant::now();
- let result = self.inner.blocking_stat(path, args);
- let dur = start.elapsed().as_secs_f64();
-
- self.handle
- .requests_duration_seconds_blocking_stat
- .record(dur);
-
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::BlockingStat, e.kind());
- e
- })
- }
-
- fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
- self.handle.requests_total_blocking_delete.increment(1);
-
- let start = Instant::now();
- let result = self.inner.blocking_delete(path, args);
- let dur = start.elapsed().as_secs_f64();
-
- self.handle
- .requests_duration_seconds_blocking_delete
- .record(dur);
-
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::BlockingDelete, e.kind());
- e
- })
- }
-
- fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingLister)> {
- self.handle.requests_total_blocking_list.increment(1);
-
- let start = Instant::now();
- let result = self.inner.blocking_list(path, args);
- let dur = start.elapsed().as_secs_f64();
-
- self.handle
- .requests_duration_seconds_blocking_list
- .record(dur);
+impl<A: Access> Layer<A> for MetricsLayer {
+ type LayeredAccess = observe::MetricsAccessor<A, MetricsInterceptor>;
- result.map_err(|e| {
- self.handle
- .increment_errors_total(Operation::BlockingList, e.kind());
- e
- })
+ fn layer(&self, inner: A) -> Self::LayeredAccess {
+ let interceptor = MetricsInterceptor {
+ path_label_level: self.path_label_level,
+ };
+ observe::MetricsLayer::new(interceptor).layer(inner)
}
}
-pub struct MetricWrapper<R> {
- inner: R,
-
- op: Operation,
- bytes_counter: Counter,
- requests_duration_seconds: Histogram,
- handle: Arc<MetricsHandler>,
+#[derive(Clone, Debug)]
+pub struct MetricsInterceptor {
+ path_label_level: usize,
}
-impl<R> MetricWrapper<R> {
- fn new(
- inner: R,
+impl observe::MetricsIntercept for MetricsInterceptor {
+ fn observe_operation_duration_seconds(
+ &self,
+ scheme: Scheme,
+ namespace: Arc<String>,
+ root: Arc<String>,
+ path: &str,
op: Operation,
- handle: Arc<MetricsHandler>,
- bytes_counter: Counter,
- requests_duration_seconds: Histogram,
- ) -> Self {
- Self {
- inner,
- op,
- handle,
- bytes_counter,
- requests_duration_seconds,
+ duration: Duration,
+ ) {
+ let labels = OperationLabels {
+ scheme,
+ namespace,
+ root,
+ path,
+ operation: op,
+ error: None,
}
+ .into_labels(self.path_label_level);
+ histogram!(observe::METRIC_OPERATION_DURATION_SECONDS.name(),
labels).record(duration)
}
-}
-impl<R: oio::Read> oio::Read for MetricWrapper<R> {
- async fn read(&mut self) -> Result<Buffer> {
- let start = Instant::now();
-
- match self.inner.read().await {
- Ok(bs) => {
- self.bytes_counter.increment(bs.remaining() as u64);
- self.requests_duration_seconds
- .record(start.elapsed().as_secs_f64());
- Ok(bs)
- }
- Err(e) => {
- self.handle.increment_errors_total(self.op, e.kind());
- Err(e)
- }
+ fn observe_operation_bytes(
+ &self,
+ scheme: Scheme,
+ namespace: Arc<String>,
+ root: Arc<String>,
+ path: &str,
+ op: Operation,
+ bytes: usize,
+ ) {
+ let labels = OperationLabels {
+ scheme,
+ namespace,
+ root,
+ path,
+ operation: op,
+ error: None,
}
+ .into_labels(self.path_label_level);
+ histogram!(observe::METRIC_OPERATION_BYTES.name(),
labels).record(bytes as f64)
}
-}
-
-impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
- fn read(&mut self) -> Result<Buffer> {
- let start = Instant::now();
- self.inner
- .read()
- .map(|bs| {
- self.bytes_counter.increment(bs.remaining() as u64);
- self.requests_duration_seconds
- .record(start.elapsed().as_secs_f64());
- bs
- })
- .map_err(|e| {
- self.handle.increment_errors_total(self.op, e.kind());
- e
- })
+ fn observe_operation_errors_total(
+ &self,
+ scheme: Scheme,
+ namespace: Arc<String>,
+ root: Arc<String>,
+ path: &str,
+ op: Operation,
+ error: ErrorKind,
+ ) {
+ let labels = OperationLabels {
+ scheme,
+ namespace,
+ root,
+ path,
+ operation: op,
+ error: Some(error),
+ }
+ .into_labels(self.path_label_level);
+ counter!(observe::METRIC_OPERATION_ERRORS_TOTAL.name(),
labels).increment(1)
}
}
-impl<R: oio::Write> oio::Write for MetricWrapper<R> {
- async fn write(&mut self, bs: Buffer) -> Result<()> {
- let start = Instant::now();
- let size = bs.len();
-
- self.inner
- .write(bs)
- .await
- .map(|_| {
- self.bytes_counter.increment(size as u64);
- self.requests_duration_seconds
- .record(start.elapsed().as_secs_f64());
- })
- .map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
- async fn abort(&mut self) -> Result<()> {
- self.inner.abort().await.map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
-
- async fn close(&mut self) -> Result<()> {
- self.inner.close().await.map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
+struct OperationLabels<'a> {
+ scheme: Scheme,
+ namespace: Arc<String>,
+ root: Arc<String>,
+ path: &'a str,
+ operation: Operation,
+ error: Option<ErrorKind>,
}
-impl<R: oio::BlockingWrite> oio::BlockingWrite for MetricWrapper<R> {
- fn write(&mut self, bs: Buffer) -> Result<()> {
- let size = bs.len();
+impl<'a> OperationLabels<'a> {
+ /// labels:
+ ///
+ /// 1. `["scheme", "namespace", "root", "operation"]`
+ /// 2. `["scheme", "namespace", "root", "operation", "path"]`
+ /// 3. `["scheme", "namespace", "root", "operation", "error"]`
+ /// 4. `["scheme", "namespace", "root", "operation", "path", "error"]`
+ fn into_labels(self, path_label_level: usize) -> Vec<Label> {
+ let mut labels = Vec::with_capacity(6);
+
+ labels.extend([
+ Label::new(observe::LABEL_SCHEME, self.scheme.into_static()),
+ Label::new(observe::LABEL_NAMESPACE, (*self.namespace).clone()),
+ Label::new(observe::LABEL_ROOT, (*self.root).clone()),
+ Label::new(observe::LABEL_OPERATION, self.operation.into_static()),
+ ]);
+
+ if let Some(path) = observe::path_label_value(self.path,
path_label_level) {
+ labels.push(Label::new(observe::LABEL_PATH, path.to_owned()));
+ }
- self.inner
- .write(bs)
- .map(|_| {
- self.bytes_counter.increment(size as u64);
- })
- .map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
- }
+ if let Some(error) = self.error {
+ labels.push(Label::new(observe::LABEL_ERROR, error.into_static()));
+ }
- fn close(&mut self) -> Result<()> {
- self.inner.close().map_err(|err| {
- self.handle.increment_errors_total(self.op, err.kind());
- err
- })
+ labels
}
}