This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bc343b9 feat: add prometheus layer support (#1930)
6bc343b9 is described below

commit 6bc343b9cfbc027c5825645413492db303f49dea
Author: congyi wang <[email protected]>
AuthorDate: Fri Apr 14 13:31:21 2023 +0800

    feat: add prometheus layer support (#1930)
    
    * add basic metrics, WIP, save work
    
    * save work
    
    * save work
    
    * save work, draft, need to add doc
    
    * fix typo
    
    * fix typo
    
    * add example doc
    
    * fmt
    
    * fmt
    
    * minor fix
    
    * fmt
    
    * rebase main and fmt
    
    * update feature flags
    
    * rename
    
    * add basic metrics, WIP, save work
    
    * save work
    
    * save work
    
    * save work, draft, need to add doc
    
    * fix typo
    
    * fix typo
    
    * add example doc
    
    * fmt
    
    * fmt
    
    * minor fix
    
    * fmt
    
    * rebase main and fmt
    
    * rename metrics
    
    * add laber for metrcis
    
    * prometheus optional = true
    
    * fix typo
    
    * fix unit test
    
    * refactor, rename and resolve comments
    
    * add op as labers as well
    
    * resolve comments
    
    * resolve comments
---
 Cargo.lock                    |  37 ++
 core/Cargo.toml               |   4 +
 core/src/docs/features.md     |   1 +
 core/src/layers/mod.rs        |   5 +
 core/src/layers/prometheus.rs | 788 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 835 insertions(+)

diff --git a/Cargo.lock b/Cargo.lock
index 08454add..8c1f303e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2265,6 +2265,7 @@ dependencies = [
  "percent-encoding",
  "pin-project",
  "pretty_assertions",
+ "prometheus",
  "prost",
  "quick-xml 0.27.1",
  "rand 0.8.5",
@@ -2809,6 +2810,36 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "procfs"
+version = "0.14.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
+dependencies = [
+ "bitflags 1.3.2",
+ "byteorder",
+ "hex",
+ "lazy_static",
+ "rustix",
+]
+
+[[package]]
+name = "prometheus"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
+dependencies = [
+ "cfg-if",
+ "fnv",
+ "lazy_static",
+ "libc",
+ "memchr",
+ "parking_lot 0.12.1",
+ "procfs",
+ "protobuf",
+ "thiserror",
+]
+
 [[package]]
 name = "prost"
 version = "0.11.8"
@@ -2832,6 +2863,12 @@ dependencies = [
  "syn 1.0.109",
 ]
 
+[[package]]
+name = "protobuf"
+version = "2.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
+
 [[package]]
 name = "pulldown-cmark"
 version = "0.9.2"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index c99fa4da..3484ec82 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -71,6 +71,7 @@ native-tls-vendored = ["reqwest/native-tls-vendored"]
 layers-all = [
   "layers-chaos",
   "layers-metrics",
+  "layers-prometheus",
   "layers-tracing",
   "layers-minitrace",
 ]
@@ -78,6 +79,8 @@ layers-all = [
 layers-chaos = ["dep:rand"]
 # Enable layers metrics support
 layers-metrics = ["dep:metrics"]
+# Enable layers prometheus support
+layers-prometheus = ["dep:prometheus"]
 # Enable layers minitrace support.
 layers-minitrace = ["dep:minitrace"]
 # Enable layers tracing support.
@@ -163,6 +166,7 @@ once_cell = "1"
 parking_lot = "0.12"
 percent-encoding = "2"
 pin-project = "1"
+prometheus = { version = "0.13", features = ["process"], optional = true}
 prost = { version = "0.11", optional = true }
 quick-xml = { version = "0.27", features = ["serialize", "overlapped-lists"] }
 rand = { version = "0.8", optional = true }
diff --git a/core/src/docs/features.md b/core/src/docs/features.md
index bac81cd6..0f3ed4e4 100644
--- a/core/src/docs/features.md
+++ b/core/src/docs/features.md
@@ -2,6 +2,7 @@
 
 - `layers-all`: Enable all layers support.
 - `layers-metrics`: Enable metrics layer support.
+- `layers-prometheus`: Enable prometheus layer support.
 - `layers-tracing`: Enable tracing layer support.
 - `layers-chaos`: Enable chaos layer support.
 
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 8c64455f..956b9792 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -36,6 +36,11 @@ mod metrics;
 #[cfg(feature = "layers-metrics")]
 pub use self::metrics::MetricsLayer;
 
+#[cfg(feature = "layers-prometheus")]
+mod prometheus;
+#[cfg(feature = "layers-prometheus")]
+pub use self::prometheus::PrometheusLayer;
+
 mod retry;
 pub use self::retry::RetryLayer;
 
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
new file mode 100644
index 00000000..6aa979dc
--- /dev/null
+++ b/core/src/layers/prometheus.rs
@@ -0,0 +1,788 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::io;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::TryFutureExt;
+use log::debug;
+
+use prometheus::core::GenericCounterVec;
+use prometheus::exponential_buckets;
+use prometheus::histogram_opts;
+use prometheus::register_histogram_vec_with_registry;
+use prometheus::register_int_counter_vec_with_registry;
+use prometheus::Registry;
+use prometheus::{core::AtomicU64, HistogramVec};
+
+use crate::ops::*;
+use crate::raw::Accessor;
+use crate::raw::*;
+use crate::*;
+/// Add [prometheus](https://docs.rs/prometheus) for every operations.
+///
+/// # Examples
+///
+/// ```
+/// use log::debug;
+/// use log::info;
+/// use opendal::services;
+/// use opendal::Operator;
+/// use opendal::Result;
+///
+/// use opendal::layers::PrometheusLayer;
+/// use prometheus::Encoder;
+///
+/// /// Visit [`opendal::services`] for more service related config.
+/// /// Visit [`opendal::Object`] for more object level APIs.
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     // Pick a builder and configure it.
+///     let builder = services::Memory::default();
+///     let registry = prometheus::default_registry();
+///
+///     let op = Operator::new(builder)
+///         .expect("must init")
+///         .layer(PrometheusLayer::with_registry(registry.clone()))
+///         .finish();
+///     debug!("operator: {op:?}");
+///
+///     // Write data into object test.
+///     op.write("test", "Hello, World!").await?;
+///     // Read data from object.
+///     let bs = op.read("test").await?;
+///     info!("content: {}", String::from_utf8_lossy(&bs));
+///
+///     // Get object metadata.
+///     let meta = op.stat("test").await?;
+///     info!("meta: {:?}", meta);
+///
+///     // Export prometheus metrics.
+///     let mut buffer = Vec::<u8>::new();
+///     let encoder = prometheus::TextEncoder::new();
+///     encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
+///     println!("## Prometheus Metrics");
+///     println!("{}", String::from_utf8(buffer.clone()).unwrap());
+///     Ok(())
+/// }
+/// ```
+#[derive(Default, Debug, Clone)]
+pub struct PrometheusLayer {
+    registry: Registry,
+}
+
+impl PrometheusLayer {
+    /// create PrometheusLayer by incoming registry.
+    pub fn with_registry(registry: Registry) -> Self {
+        Self { registry }
+    }
+}
+
+impl<A: Accessor> Layer<A> for PrometheusLayer {
+    type LayeredAccessor = PrometheusAccessor<A>;
+
+    fn layer(&self, inner: A) -> Self::LayeredAccessor {
+        let meta = inner.info();
+        let scheme = meta.scheme();
+
+        PrometheusAccessor {
+            inner,
+            stats: Arc::new(PrometheusMetrics::new(self.registry.clone())),
+            scheme: scheme.to_string(),
+        }
+    }
+}
+/// [`PrometheusMetrics`] provide the performance and IO metrics.
+#[derive(Debug)]
+pub struct PrometheusMetrics {
+    /// Total times of the specific operation be called.
+    pub requests_total: GenericCounterVec<AtomicU64>,
+    /// Latency of the specific operation be called.
+    pub requests_duration_seconds: HistogramVec,
+    /// Size of the specific metrics.
+    pub bytes_total: HistogramVec,
+}
+
+impl PrometheusMetrics {
+    /// new with prometheus register.
+    pub fn new(registry: Registry) -> Self {
+        let requests_total = register_int_counter_vec_with_registry!(
+            "requests_total",
+            "Total times of create be called",
+            &["scheme", "operation"],
+            registry
+        )
+        .unwrap();
+        let opts = histogram_opts!(
+            "requests_duration_seconds",
+            "Histogram of the time spent on specific operation",
+            exponential_buckets(0.01, 2.0, 16).unwrap()
+        );
+
+        let requests_duration_seconds =
+            register_histogram_vec_with_registry!(opts, &["scheme", 
"operation"], registry)
+                .unwrap();
+
+        let opts = histogram_opts!(
+            "bytes_total",
+            "Total size of ",
+            exponential_buckets(0.01, 2.0, 16).unwrap()
+        );
+        let bytes_total =
+            register_histogram_vec_with_registry!(opts, &["scheme", 
"operation"], registry)
+                .unwrap();
+
+        Self {
+            requests_total,
+            requests_duration_seconds,
+            bytes_total,
+        }
+    }
+
+    /// error handling is the cold path, so we will not init error counters
+    /// in advance.
+    #[inline]
+    fn increment_errors_total(&self, op: Operation, kind: ErrorKind) {
+        debug!(
+            "Prometheus statistics metrics error, operation {} error {}",
+            op.into_static(),
+            kind.into_static()
+        );
+    }
+}
+
+#[derive(Clone)]
+pub struct PrometheusAccessor<A: Accessor> {
+    inner: A,
+    stats: Arc<PrometheusMetrics>,
+    scheme: String,
+}
+
+impl<A: Accessor> Debug for PrometheusAccessor<A> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("PrometheusAccessor")
+            .field("inner", &self.inner)
+            .finish_non_exhaustive()
+    }
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
+    type Inner = A;
+    type Reader = PrometheusMetricWrapper<A::Reader>;
+    type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
+    type Writer = PrometheusMetricWrapper<A::Writer>;
+    type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
+    type Pager = A::Pager;
+    type BlockingPager = A::BlockingPager;
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
+
+    async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::Create.into_static()])
+            .start_timer();
+        let create_res = self.inner.create(path, args).await;
+
+        timer.observe_duration();
+        create_res.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::Create, e.kind());
+            e
+        })
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::Read.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::Read.into_static()])
+            .start_timer();
+
+        let read_res = self
+            .inner
+            .read(path, args)
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    self.stats
+                        .bytes_total
+                        .with_label_values(&[&self.scheme, 
Operation::Read.into_static()])
+                        .observe(rp.metadata().content_length() as f64);
+                    (
+                        rp,
+                        PrometheusMetricWrapper::new(
+                            r,
+                            Operation::Read,
+                            self.stats.clone(),
+                            &self.scheme,
+                        ),
+                    )
+                })
+            })
+            .await;
+        timer.observe_duration();
+        read_res.map_err(|e| {
+            self.stats.increment_errors_total(Operation::Read, e.kind());
+            e
+        })
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::Write.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::Write.into_static()])
+            .start_timer();
+
+        let write_res = self
+            .inner
+            .write(path, args)
+            .map(|v| {
+                v.map(|(rp, r)| {
+                    (
+                        rp,
+                        PrometheusMetricWrapper::new(
+                            r,
+                            Operation::Write,
+                            self.stats.clone(),
+                            &self.scheme,
+                        ),
+                    )
+                })
+            })
+            .await;
+        timer.observe_duration();
+        write_res.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::Write, e.kind());
+            e
+        })
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+            .inc();
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+            .start_timer();
+
+        let stat_res = self
+            .inner
+            .stat(path, args)
+            .inspect_err(|e| {
+                self.stats.increment_errors_total(Operation::Stat, e.kind());
+            })
+            .await;
+        timer.observe_duration();
+        stat_res.map_err(|e| {
+            self.stats.increment_errors_total(Operation::Stat, e.kind());
+            e
+        })
+    }
+
+    async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+            .start_timer();
+
+        let delete_res = self.inner.delete(path, args).await;
+        timer.observe_duration();
+        delete_res.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::Delete, e.kind());
+            e
+        })
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Pager)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::List.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::List.into_static()])
+            .start_timer();
+
+        let list_res = self.inner.list(path, args).await;
+
+        timer.observe_duration();
+        list_res.map_err(|e| {
+            self.stats.increment_errors_total(Operation::List, e.kind());
+            e
+        })
+    }
+
+    async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::Pager)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::Scan.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::Scan.into_static()])
+            .start_timer();
+
+        let scan_res = self.inner.scan(path, args).await;
+        timer.observe_duration();
+        scan_res.map_err(|e| {
+            self.stats.increment_errors_total(Operation::Scan, e.kind());
+            e
+        })
+    }
+
+    async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, Operation::Batch.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, Operation::Batch.into_static()])
+            .start_timer();
+        let result = self.inner.batch(args).await;
+
+        timer.observe_duration();
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::Batch, e.kind());
+            e
+        })
+    }
+
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::Presign.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::Presign.into_static()])
+            .start_timer();
+        let result = self.inner.presign(path, args).await;
+        timer.observe_duration();
+
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::Presign, e.kind());
+            e
+        })
+    }
+
+    fn blocking_create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingCreate.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::BlockingCreate.into_static()])
+            .start_timer();
+        let result = self.inner.blocking_create(path, args);
+
+        timer.observe_duration();
+
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingCreate, e.kind());
+            e
+        })
+    }
+
+    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::BlockingReader)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingRead.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme])
+            .start_timer();
+        let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
+            self.stats
+                .bytes_total
+                .with_label_values(&[&self.scheme, 
Operation::BlockingRead.into_static()])
+                .observe(rp.metadata().content_length() as f64);
+            (
+                rp,
+                PrometheusMetricWrapper::new(
+                    r,
+                    Operation::BlockingRead,
+                    self.stats.clone(),
+                    &self.scheme,
+                ),
+            )
+        });
+        timer.observe_duration();
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingRead, e.kind());
+            e
+        })
+    }
+
+    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::BlockingWriter)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
+            .start_timer();
+        let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
+            (
+                rp,
+                PrometheusMetricWrapper::new(
+                    r,
+                    Operation::BlockingWrite,
+                    self.stats.clone(),
+                    &self.scheme,
+                ),
+            )
+        });
+        timer.observe_duration();
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingWrite, e.kind());
+            e
+        })
+    }
+
+    fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingStat.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::BlockingStat.into_static()])
+            .start_timer();
+        let result = self.inner.blocking_stat(path, args);
+        timer.observe_duration();
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingStat, e.kind());
+            e
+        })
+    }
+
+    fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingDelete.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::BlockingDelete.into_static()])
+            .start_timer();
+        let result = self.inner.blocking_delete(path, args);
+        timer.observe_duration();
+
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingDelete, e.kind());
+            e
+        })
+    }
+
+    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::BlockingPager)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingList.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::BlockingList.into_static()])
+            .start_timer();
+        let result = self.inner.blocking_list(path, args);
+        timer.observe_duration();
+
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingList, e.kind());
+            e
+        })
+    }
+
+    fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan, 
Self::BlockingPager)> {
+        self.stats
+            .requests_total
+            .with_label_values(&[&self.scheme, 
Operation::BlockingScan.into_static()])
+            .inc();
+
+        let timer = self
+            .stats
+            .requests_duration_seconds
+            .with_label_values(&[&self.scheme, 
Operation::BlockingScan.into_static()])
+            .start_timer();
+        let result = self.inner.blocking_scan(path, args);
+        timer.observe_duration();
+        result.map_err(|e| {
+            self.stats
+                .increment_errors_total(Operation::BlockingScan, e.kind());
+            e
+        })
+    }
+}
+
+pub struct PrometheusMetricWrapper<R> {
+    inner: R,
+
+    op: Operation,
+    stats: Arc<PrometheusMetrics>,
+    scheme: String,
+}
+
+impl<R> PrometheusMetricWrapper<R> {
+    fn new(inner: R, op: Operation, stats: Arc<PrometheusMetrics>, scheme: 
&String) -> Self {
+        Self {
+            inner,
+            op,
+            stats,
+            scheme: scheme.to_string(),
+        }
+    }
+}
+
+impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        self.inner.poll_read(cx, buf).map(|res| match res {
+            Ok(bytes) => {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::Read.into_static()])
+                    .observe(bytes as f64);
+                Ok(bytes)
+            }
+            Err(e) => {
+                self.stats.increment_errors_total(self.op, e.kind());
+                Err(e)
+            }
+        })
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> 
Poll<Result<u64>> {
+        self.inner.poll_seek(cx, pos).map(|res| match res {
+            Ok(n) => Ok(n),
+            Err(e) => {
+                self.stats.increment_errors_total(self.op, e.kind());
+                Err(e)
+            }
+        })
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        self.inner.poll_next(cx).map(|res| match res {
+            Some(Ok(bytes)) => {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::Read.into_static()])
+                    .observe(bytes.len() as f64);
+                Some(Ok(bytes))
+            }
+            Some(Err(e)) => {
+                self.stats.increment_errors_total(self.op, e.kind());
+                Some(Err(e))
+            }
+            None => None,
+        })
+    }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
+    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+        self.inner
+            .read(buf)
+            .map(|n| {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::BlockingRead.into_static()])
+                    .observe(n as f64);
+                n
+            })
+            .map_err(|e| {
+                self.stats.increment_errors_total(self.op, e.kind());
+                e
+            })
+    }
+
+    fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
+        self.inner.seek(pos).map_err(|err| {
+            self.stats.increment_errors_total(self.op, err.kind());
+            err
+        })
+    }
+
+    fn next(&mut self) -> Option<Result<Bytes>> {
+        self.inner.next().map(|res| match res {
+            Ok(bytes) => {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::BlockingRead.into_static()])
+                    .observe(bytes.len() as f64);
+                Ok(bytes)
+            }
+            Err(e) => {
+                self.stats.increment_errors_total(self.op, e.kind());
+                Err(e)
+            }
+        })
+    }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
+    async fn write(&mut self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
+        self.inner
+            .write(bs)
+            .await
+            .map(|_| {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
+                    .observe(size as f64)
+            })
+            .map_err(|err| {
+                self.stats.increment_errors_total(self.op, err.kind());
+                err
+            })
+    }
+
+    async fn append(&mut self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
+        self.inner
+            .append(bs)
+            .await
+            .map(|_| {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::Write.into_static()])
+                    .observe(size as f64)
+            })
+            .map_err(|err| {
+                self.stats.increment_errors_total(self.op, err.kind());
+                err
+            })
+    }
+
+    async fn close(&mut self) -> Result<()> {
+        self.inner.close().await.map_err(|err| {
+            self.stats.increment_errors_total(self.op, err.kind());
+            err
+        })
+    }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
+    fn write(&mut self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
+        self.inner
+            .write(bs)
+            .map(|_| {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
+                    .observe(size as f64)
+            })
+            .map_err(|err| {
+                self.stats.increment_errors_total(self.op, err.kind());
+                err
+            })
+    }
+
+    fn append(&mut self, bs: Bytes) -> Result<()> {
+        let size = bs.len();
+        self.inner
+            .append(bs)
+            .map(|_| {
+                self.stats
+                    .bytes_total
+                    .with_label_values(&[&self.scheme, 
Operation::BlockingWrite.into_static()])
+                    .observe(size as f64)
+            })
+            .map_err(|err| {
+                self.stats.increment_errors_total(self.op, err.kind());
+                err
+            })
+    }
+
+    fn close(&mut self) -> Result<()> {
+        self.inner.close().map_err(|err| {
+            self.stats.increment_errors_total(self.op, err.kind());
+            err
+        })
+    }
+}

Reply via email to