This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 6bc343b9 feat: add prometheus layer support (#1930)
6bc343b9 is described below
commit 6bc343b9cfbc027c5825645413492db303f49dea
Author: congyi wang <[email protected]>
AuthorDate: Fri Apr 14 13:31:21 2023 +0800
feat: add prometheus layer support (#1930)
* add basic metrics, WIP, save work
* save work
* save work
* save work, draft, need to add doc
* fix typo
* fix typo
* add example doc
* fmt
* fmt
* minor fix
* fmt
* rebase main and fmt
* update feature flags
* rename
* add basic metrics, WIP, save work
* save work
* save work
* save work, draft, need to add doc
* fix typo
* fix typo
* add example doc
* fmt
* fmt
* minor fix
* fmt
* rebase main and fmt
* rename metrics
* add laber for metrcis
* prometheus optional = true
* fix typo
* fix unit test
* refactor, rename and resolve comments
* add op as labers as well
* resolve comments
* resolve comments
---
Cargo.lock | 37 ++
core/Cargo.toml | 4 +
core/src/docs/features.md | 1 +
core/src/layers/mod.rs | 5 +
core/src/layers/prometheus.rs | 788 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 835 insertions(+)
diff --git a/Cargo.lock b/Cargo.lock
index 08454add..8c1f303e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2265,6 +2265,7 @@ dependencies = [
"percent-encoding",
"pin-project",
"pretty_assertions",
+ "prometheus",
"prost",
"quick-xml 0.27.1",
"rand 0.8.5",
@@ -2809,6 +2810,36 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "procfs"
+version = "0.14.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1de8dacb0873f77e6aefc6d71e044761fcc68060290f5b1089fcdf84626bb69"
+dependencies = [
+ "bitflags 1.3.2",
+ "byteorder",
+ "hex",
+ "lazy_static",
+ "rustix",
+]
+
+[[package]]
+name = "prometheus"
+version = "0.13.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "449811d15fbdf5ceb5c1144416066429cf82316e2ec8ce0c1f6f8a02e7bbcf8c"
+dependencies = [
+ "cfg-if",
+ "fnv",
+ "lazy_static",
+ "libc",
+ "memchr",
+ "parking_lot 0.12.1",
+ "procfs",
+ "protobuf",
+ "thiserror",
+]
+
[[package]]
name = "prost"
version = "0.11.8"
@@ -2832,6 +2863,12 @@ dependencies = [
"syn 1.0.109",
]
+[[package]]
+name = "protobuf"
+version = "2.28.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94"
+
[[package]]
name = "pulldown-cmark"
version = "0.9.2"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index c99fa4da..3484ec82 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -71,6 +71,7 @@ native-tls-vendored = ["reqwest/native-tls-vendored"]
layers-all = [
"layers-chaos",
"layers-metrics",
+ "layers-prometheus",
"layers-tracing",
"layers-minitrace",
]
@@ -78,6 +79,8 @@ layers-all = [
layers-chaos = ["dep:rand"]
# Enable layers metrics support
layers-metrics = ["dep:metrics"]
+# Enable layers prometheus support
+layers-prometheus = ["dep:prometheus"]
# Enable layers minitrace support.
layers-minitrace = ["dep:minitrace"]
# Enable layers tracing support.
@@ -163,6 +166,7 @@ once_cell = "1"
parking_lot = "0.12"
percent-encoding = "2"
pin-project = "1"
+prometheus = { version = "0.13", features = ["process"], optional = true}
prost = { version = "0.11", optional = true }
quick-xml = { version = "0.27", features = ["serialize", "overlapped-lists"] }
rand = { version = "0.8", optional = true }
diff --git a/core/src/docs/features.md b/core/src/docs/features.md
index bac81cd6..0f3ed4e4 100644
--- a/core/src/docs/features.md
+++ b/core/src/docs/features.md
@@ -2,6 +2,7 @@
- `layers-all`: Enable all layers support.
- `layers-metrics`: Enable metrics layer support.
+- `layers-prometheus`: Enable prometheus layer support.
- `layers-tracing`: Enable tracing layer support.
- `layers-chaos`: Enable chaos layer support.
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index 8c64455f..956b9792 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -36,6 +36,11 @@ mod metrics;
#[cfg(feature = "layers-metrics")]
pub use self::metrics::MetricsLayer;
+#[cfg(feature = "layers-prometheus")]
+mod prometheus;
+#[cfg(feature = "layers-prometheus")]
+pub use self::prometheus::PrometheusLayer;
+
mod retry;
pub use self::retry::RetryLayer;
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
new file mode 100644
index 00000000..6aa979dc
--- /dev/null
+++ b/core/src/layers/prometheus.rs
@@ -0,0 +1,788 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::io;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::TryFutureExt;
+use log::debug;
+
+use prometheus::core::GenericCounterVec;
+use prometheus::exponential_buckets;
+use prometheus::histogram_opts;
+use prometheus::register_histogram_vec_with_registry;
+use prometheus::register_int_counter_vec_with_registry;
+use prometheus::Registry;
+use prometheus::{core::AtomicU64, HistogramVec};
+
+use crate::ops::*;
+use crate::raw::Accessor;
+use crate::raw::*;
+use crate::*;
+/// Add [prometheus](https://docs.rs/prometheus) for every operations.
+///
+/// # Examples
+///
+/// ```
+/// use log::debug;
+/// use log::info;
+/// use opendal::services;
+/// use opendal::Operator;
+/// use opendal::Result;
+///
+/// use opendal::layers::PrometheusLayer;
+/// use prometheus::Encoder;
+///
+/// /// Visit [`opendal::services`] for more service related config.
+/// /// Visit [`opendal::Object`] for more object level APIs.
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // Pick a builder and configure it.
+/// let builder = services::Memory::default();
+/// let registry = prometheus::default_registry();
+///
+/// let op = Operator::new(builder)
+/// .expect("must init")
+/// .layer(PrometheusLayer::with_registry(registry.clone()))
+/// .finish();
+/// debug!("operator: {op:?}");
+///
+/// // Write data into object test.
+/// op.write("test", "Hello, World!").await?;
+/// // Read data from object.
+/// let bs = op.read("test").await?;
+/// info!("content: {}", String::from_utf8_lossy(&bs));
+///
+/// // Get object metadata.
+/// let meta = op.stat("test").await?;
+/// info!("meta: {:?}", meta);
+///
+/// // Export prometheus metrics.
+/// let mut buffer = Vec::<u8>::new();
+/// let encoder = prometheus::TextEncoder::new();
+/// encoder.encode(&prometheus::gather(), &mut buffer).unwrap();
+/// println!("## Prometheus Metrics");
+/// println!("{}", String::from_utf8(buffer.clone()).unwrap());
+/// Ok(())
+/// }
+/// ```
+#[derive(Default, Debug, Clone)]
+pub struct PrometheusLayer {
+ registry: Registry,
+}
+
+impl PrometheusLayer {
+ /// create PrometheusLayer by incoming registry.
+ pub fn with_registry(registry: Registry) -> Self {
+ Self { registry }
+ }
+}
+
+impl<A: Accessor> Layer<A> for PrometheusLayer {
+ type LayeredAccessor = PrometheusAccessor<A>;
+
+ fn layer(&self, inner: A) -> Self::LayeredAccessor {
+ let meta = inner.info();
+ let scheme = meta.scheme();
+
+ PrometheusAccessor {
+ inner,
+ stats: Arc::new(PrometheusMetrics::new(self.registry.clone())),
+ scheme: scheme.to_string(),
+ }
+ }
+}
+/// [`PrometheusMetrics`] provide the performance and IO metrics.
+#[derive(Debug)]
+pub struct PrometheusMetrics {
+ /// Total times of the specific operation be called.
+ pub requests_total: GenericCounterVec<AtomicU64>,
+ /// Latency of the specific operation be called.
+ pub requests_duration_seconds: HistogramVec,
+ /// Size of the specific metrics.
+ pub bytes_total: HistogramVec,
+}
+
+impl PrometheusMetrics {
+ /// new with prometheus register.
+ pub fn new(registry: Registry) -> Self {
+ let requests_total = register_int_counter_vec_with_registry!(
+ "requests_total",
+ "Total times of create be called",
+ &["scheme", "operation"],
+ registry
+ )
+ .unwrap();
+ let opts = histogram_opts!(
+ "requests_duration_seconds",
+ "Histogram of the time spent on specific operation",
+ exponential_buckets(0.01, 2.0, 16).unwrap()
+ );
+
+ let requests_duration_seconds =
+ register_histogram_vec_with_registry!(opts, &["scheme",
"operation"], registry)
+ .unwrap();
+
+ let opts = histogram_opts!(
+ "bytes_total",
+ "Total size of ",
+ exponential_buckets(0.01, 2.0, 16).unwrap()
+ );
+ let bytes_total =
+ register_histogram_vec_with_registry!(opts, &["scheme",
"operation"], registry)
+ .unwrap();
+
+ Self {
+ requests_total,
+ requests_duration_seconds,
+ bytes_total,
+ }
+ }
+
+ /// error handling is the cold path, so we will not init error counters
+ /// in advance.
+ #[inline]
+ fn increment_errors_total(&self, op: Operation, kind: ErrorKind) {
+ debug!(
+ "Prometheus statistics metrics error, operation {} error {}",
+ op.into_static(),
+ kind.into_static()
+ );
+ }
+}
+
+#[derive(Clone)]
+pub struct PrometheusAccessor<A: Accessor> {
+ inner: A,
+ stats: Arc<PrometheusMetrics>,
+ scheme: String,
+}
+
+impl<A: Accessor> Debug for PrometheusAccessor<A> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PrometheusAccessor")
+ .field("inner", &self.inner)
+ .finish_non_exhaustive()
+ }
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
+ type Inner = A;
+ type Reader = PrometheusMetricWrapper<A::Reader>;
+ type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
+ type Writer = PrometheusMetricWrapper<A::Writer>;
+ type BlockingWriter = PrometheusMetricWrapper<A::BlockingWriter>;
+ type Pager = A::Pager;
+ type BlockingPager = A::BlockingPager;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
+
+ async fn create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::Create.into_static()])
+ .start_timer();
+ let create_res = self.inner.create(path, args).await;
+
+ timer.observe_duration();
+ create_res.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::Create, e.kind());
+ e
+ })
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::Read.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::Read.into_static()])
+ .start_timer();
+
+ let read_res = self
+ .inner
+ .read(path, args)
+ .map(|v| {
+ v.map(|(rp, r)| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::Read.into_static()])
+ .observe(rp.metadata().content_length() as f64);
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::Read,
+ self.stats.clone(),
+ &self.scheme,
+ ),
+ )
+ })
+ })
+ .await;
+ timer.observe_duration();
+ read_res.map_err(|e| {
+ self.stats.increment_errors_total(Operation::Read, e.kind());
+ e
+ })
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::Write.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::Write.into_static()])
+ .start_timer();
+
+ let write_res = self
+ .inner
+ .write(path, args)
+ .map(|v| {
+ v.map(|(rp, r)| {
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::Write,
+ self.stats.clone(),
+ &self.scheme,
+ ),
+ )
+ })
+ })
+ .await;
+ timer.observe_duration();
+ write_res.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::Write, e.kind());
+ e
+ })
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+ .inc();
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+ .start_timer();
+
+ let stat_res = self
+ .inner
+ .stat(path, args)
+ .inspect_err(|e| {
+ self.stats.increment_errors_total(Operation::Stat, e.kind());
+ })
+ .await;
+ timer.observe_duration();
+ stat_res.map_err(|e| {
+ self.stats.increment_errors_total(Operation::Stat, e.kind());
+ e
+ })
+ }
+
+ async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::Stat.into_static()])
+ .start_timer();
+
+ let delete_res = self.inner.delete(path, args).await;
+ timer.observe_duration();
+ delete_res.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::Delete, e.kind());
+ e
+ })
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::List.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::List.into_static()])
+ .start_timer();
+
+ let list_res = self.inner.list(path, args).await;
+
+ timer.observe_duration();
+ list_res.map_err(|e| {
+ self.stats.increment_errors_total(Operation::List, e.kind());
+ e
+ })
+ }
+
+ async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::Pager)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::Scan.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::Scan.into_static()])
+ .start_timer();
+
+ let scan_res = self.inner.scan(path, args).await;
+ timer.observe_duration();
+ scan_res.map_err(|e| {
+ self.stats.increment_errors_total(Operation::Scan, e.kind());
+ e
+ })
+ }
+
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme, Operation::Batch.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme, Operation::Batch.into_static()])
+ .start_timer();
+ let result = self.inner.batch(args).await;
+
+ timer.observe_duration();
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::Batch, e.kind());
+ e
+ })
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::Presign.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::Presign.into_static()])
+ .start_timer();
+ let result = self.inner.presign(path, args).await;
+ timer.observe_duration();
+
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::Presign, e.kind());
+ e
+ })
+ }
+
+ fn blocking_create(&self, path: &str, args: OpCreate) -> Result<RpCreate> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingCreate.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::BlockingCreate.into_static()])
+ .start_timer();
+ let result = self.inner.blocking_create(path, args);
+
+ timer.observe_duration();
+
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingCreate, e.kind());
+ e
+ })
+ }
+
+ fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingRead.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme])
+ .start_timer();
+ let result = self.inner.blocking_read(path, args).map(|(rp, r)| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingRead.into_static()])
+ .observe(rp.metadata().content_length() as f64);
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::BlockingRead,
+ self.stats.clone(),
+ &self.scheme,
+ ),
+ )
+ });
+ timer.observe_duration();
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingRead, e.kind());
+ e
+ })
+ }
+
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingWrite.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::BlockingWrite.into_static()])
+ .start_timer();
+ let result = self.inner.blocking_write(path, args).map(|(rp, r)| {
+ (
+ rp,
+ PrometheusMetricWrapper::new(
+ r,
+ Operation::BlockingWrite,
+ self.stats.clone(),
+ &self.scheme,
+ ),
+ )
+ });
+ timer.observe_duration();
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingWrite, e.kind());
+ e
+ })
+ }
+
+ fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingStat.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::BlockingStat.into_static()])
+ .start_timer();
+ let result = self.inner.blocking_stat(path, args);
+ timer.observe_duration();
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingStat, e.kind());
+ e
+ })
+ }
+
+ fn blocking_delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingDelete.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::BlockingDelete.into_static()])
+ .start_timer();
+ let result = self.inner.blocking_delete(path, args);
+ timer.observe_duration();
+
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingDelete, e.kind());
+ e
+ })
+ }
+
+ fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingList.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::BlockingList.into_static()])
+ .start_timer();
+ let result = self.inner.blocking_list(path, args);
+ timer.observe_duration();
+
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingList, e.kind());
+ e
+ })
+ }
+
+ fn blocking_scan(&self, path: &str, args: OpScan) -> Result<(RpScan,
Self::BlockingPager)> {
+ self.stats
+ .requests_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingScan.into_static()])
+ .inc();
+
+ let timer = self
+ .stats
+ .requests_duration_seconds
+ .with_label_values(&[&self.scheme,
Operation::BlockingScan.into_static()])
+ .start_timer();
+ let result = self.inner.blocking_scan(path, args);
+ timer.observe_duration();
+ result.map_err(|e| {
+ self.stats
+ .increment_errors_total(Operation::BlockingScan, e.kind());
+ e
+ })
+ }
+}
+
+pub struct PrometheusMetricWrapper<R> {
+ inner: R,
+
+ op: Operation,
+ stats: Arc<PrometheusMetrics>,
+ scheme: String,
+}
+
+impl<R> PrometheusMetricWrapper<R> {
+ fn new(inner: R, op: Operation, stats: Arc<PrometheusMetrics>, scheme:
&String) -> Self {
+ Self {
+ inner,
+ op,
+ stats,
+ scheme: scheme.to_string(),
+ }
+ }
+}
+
+impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
+ fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) ->
Poll<Result<usize>> {
+ self.inner.poll_read(cx, buf).map(|res| match res {
+ Ok(bytes) => {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::Read.into_static()])
+ .observe(bytes as f64);
+ Ok(bytes)
+ }
+ Err(e) => {
+ self.stats.increment_errors_total(self.op, e.kind());
+ Err(e)
+ }
+ })
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) ->
Poll<Result<u64>> {
+ self.inner.poll_seek(cx, pos).map(|res| match res {
+ Ok(n) => Ok(n),
+ Err(e) => {
+ self.stats.increment_errors_total(self.op, e.kind());
+ Err(e)
+ }
+ })
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ self.inner.poll_next(cx).map(|res| match res {
+ Some(Ok(bytes)) => {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::Read.into_static()])
+ .observe(bytes.len() as f64);
+ Some(Ok(bytes))
+ }
+ Some(Err(e)) => {
+ self.stats.increment_errors_total(self.op, e.kind());
+ Some(Err(e))
+ }
+ None => None,
+ })
+ }
+}
+
+impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
+ fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ self.inner
+ .read(buf)
+ .map(|n| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingRead.into_static()])
+ .observe(n as f64);
+ n
+ })
+ .map_err(|e| {
+ self.stats.increment_errors_total(self.op, e.kind());
+ e
+ })
+ }
+
+ fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
+ self.inner.seek(pos).map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+
+ fn next(&mut self) -> Option<Result<Bytes>> {
+ self.inner.next().map(|res| match res {
+ Ok(bytes) => {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingRead.into_static()])
+ .observe(bytes.len() as f64);
+ Ok(bytes)
+ }
+ Err(e) => {
+ self.stats.increment_errors_total(self.op, e.kind());
+ Err(e)
+ }
+ })
+ }
+}
+
+#[async_trait]
+impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
+ async fn write(&mut self, bs: Bytes) -> Result<()> {
+ let size = bs.len();
+ self.inner
+ .write(bs)
+ .await
+ .map(|_| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::Write.into_static()])
+ .observe(size as f64)
+ })
+ .map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+
+ async fn append(&mut self, bs: Bytes) -> Result<()> {
+ let size = bs.len();
+ self.inner
+ .append(bs)
+ .await
+ .map(|_| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::Write.into_static()])
+ .observe(size as f64)
+ })
+ .map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+
+ async fn close(&mut self) -> Result<()> {
+ self.inner.close().await.map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+}
+
+impl<R: oio::BlockingWrite> oio::BlockingWrite for PrometheusMetricWrapper<R> {
+ fn write(&mut self, bs: Bytes) -> Result<()> {
+ let size = bs.len();
+ self.inner
+ .write(bs)
+ .map(|_| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingWrite.into_static()])
+ .observe(size as f64)
+ })
+ .map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+
+ fn append(&mut self, bs: Bytes) -> Result<()> {
+ let size = bs.len();
+ self.inner
+ .append(bs)
+ .map(|_| {
+ self.stats
+ .bytes_total
+ .with_label_values(&[&self.scheme,
Operation::BlockingWrite.into_static()])
+ .observe(size as f64)
+ })
+ .map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+
+ fn close(&mut self) -> Result<()> {
+ self.inner.close().map_err(|err| {
+ self.stats.increment_errors_total(self.op, err.kind());
+ err
+ })
+ }
+}