This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 4c5ea31a9 feat(layers): support await_tree instrument (#2623)
4c5ea31a9 is described below
commit 4c5ea31a9bc51a6abecf29de32287add0687120f
Author: Jun Ouyang <[email protected]>
AuthorDate: Thu Jul 27 00:19:27 2023 +0800
feat(layers): support await_tree instrument (#2623)
* feat(layers): add await-tree async instrument layer
Signed-off-by: owl <[email protected]>
* feat(layers): fix code
Signed-off-by: owl <[email protected]>
* feat(layers): fix code
Signed-off-by: owl <[email protected]>
* feat(layers): fix code
Signed-off-by: owl <[email protected]>
* feat(layers): fix code
Signed-off-by: owl <[email protected]>
* feat(layers): fix code
Signed-off-by: owl <[email protected]>
* feat(layers): fix code
Signed-off-by: owl <[email protected]>
---------
Signed-off-by: owl <[email protected]>
---
Cargo.lock | 84 ++++++++++++++++++++-
core/Cargo.toml | 4 +
core/src/layers/await_tree.rs | 166 ++++++++++++++++++++++++++++++++++++++++++
core/src/layers/mod.rs | 4 +
4 files changed, 257 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 167a6d188..cb0b29ed6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -364,6 +364,23 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
+[[package]]
+name = "await-tree"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "325bcfc4b87d4aa36f1319b806bacc40fcefcaf43a12bd85a5a2f44fc14ce9de"
+dependencies = [
+ "coarsetime",
+ "derive_builder",
+ "flexstr",
+ "indextree",
+ "itertools",
+ "parking_lot 0.12.1",
+ "pin-project",
+ "tokio",
+ "tracing",
+]
+
[[package]]
name = "awaitable"
version = "0.4.0"
@@ -861,6 +878,18 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b"
+[[package]]
+name = "coarsetime"
+version = "0.1.23"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a90d114103adbc625300f346d4d09dfb4ab1c4a8df6868435dd903392ecf4354"
+dependencies = [
+ "libc",
+ "once_cell",
+ "wasi 0.11.0+wasi-snapshot-preview1",
+ "wasm-bindgen",
+]
+
[[package]]
name = "colorchoice"
version = "1.0.0"
@@ -1241,6 +1270,37 @@ dependencies = [
"syn 2.0.23",
]
+[[package]]
+name = "derive_builder"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8"
+dependencies = [
+ "derive_builder_macro",
+]
+
+[[package]]
+name = "derive_builder_core"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f"
+dependencies = [
+ "darling",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "derive_builder_macro"
+version = "0.12.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e"
+dependencies = [
+ "derive_builder_core",
+ "syn 1.0.109",
+]
+
[[package]]
name = "derive_destructure2"
version = "0.1.1"
@@ -1474,6 +1534,15 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda653ca797810c02f7ca4b804b40b8b95ae046eb989d356bce17919a8c25499"
+[[package]]
+name = "flexstr"
+version = "0.9.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4d50aef14619d336a54fca5a592d952eb39037b1a1e7e6afd9f91c892ac7ef65"
+dependencies = [
+ "static_assertions",
+]
+
[[package]]
name = "float-cmp"
version = "0.9.0"
@@ -2072,6 +2141,12 @@ dependencies = [
"hashbrown 0.14.0",
]
+[[package]]
+name = "indextree"
+version = "4.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c40411d0e5c63ef1323c3d09ce5ec6d84d71531e18daed0743fccea279d7deb6"
+
[[package]]
name = "indoc"
version = "1.0.9"
@@ -3031,6 +3106,7 @@ dependencies = [
"async-compat",
"async-tls",
"async-trait",
+ "await-tree",
"backon",
"base64 0.21.2",
"bb8",
@@ -3924,7 +4000,7 @@ dependencies = [
"indoc",
"libc",
"memoffset",
- "parking_lot 0.11.2",
+ "parking_lot 0.12.1",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
@@ -4989,6 +5065,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
+[[package]]
+name = "static_assertions"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+
[[package]]
name = "strsim"
version = "0.10.0"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 45ef98e7f..8be27e9b3 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -76,6 +76,7 @@ layers-all = [
"layers-tracing",
"layers-minitrace",
"layers-throttle",
+ "layers-await-tree",
]
# Enable layers chaos support
layers-chaos = ["dep:rand"]
@@ -93,6 +94,8 @@ layers-tracing = ["dep:tracing"]
layers-otel-trace = ["dep:opentelemetry"]
# Enable layers throttle support.
layers-throttle = ["dep:governor"]
+# Enable layers await-tree support.
+layers-await-tree = ["dep:await-tree"]
services-azblob = [
"dep:sha2",
@@ -183,6 +186,7 @@ anyhow = { version = "1.0.30", features = ["std"] }
async-compat = "0.2"
async-tls = { version = "0.11", optional = true }
async-trait = "0.1.68"
+await-tree = { version = "0.1.1", optional = true }
backon = "0.4.0"
base64 = "0.21"
bb8 = { version = "0.8", optional = true }
diff --git a/core/src/layers/await_tree.rs b/core/src/layers/await_tree.rs
new file mode 100644
index 000000000..583462df6
--- /dev/null
+++ b/core/src/layers/await_tree.rs
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+
+use async_trait::async_trait;
+
+use await_tree::InstrumentAwait;
+
+/// Add a Instrument await-tree for actor-based applications to the underlying
services.
+///
+/// # AwaitTree
+///
+/// await-tree allows developers to dump this execution tree at runtime,
+/// with the span of each Future annotated by instrument_await.
+/// Read more about [await-tree](https://docs.rs/await-tree/latest/await_tree/)
+///
+/// # Examples
+///
+/// ```
+/// use anyhow::Result;
+/// use opendal::layers::AwaitTreeLayer;
+/// use opendal::services;
+/// use opendal::Operator;
+/// use opendal::Scheme;
+///
+/// let _ = Operator::new(services::Memory::default())
+/// .expect("must init")
+/// .layer(AwaitTreeLayer::new())
+/// .finish();
+/// ```
+#[derive(Clone, Default)]
+pub struct AwaitTreeLayer {}
+
+impl AwaitTreeLayer {
+ /// Create a new `AwaitTreeLayer`.
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl<A: Accessor> Layer<A> for AwaitTreeLayer {
+ type LayeredAccessor = AwaitTreeAccessor<A>;
+
+ fn layer(&self, accessor: A) -> Self::LayeredAccessor {
+ AwaitTreeAccessor { inner: accessor }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct AwaitTreeAccessor<A: Accessor> {
+ inner: A,
+}
+
+#[async_trait]
+impl<A: Accessor> LayeredAccessor for AwaitTreeAccessor<A> {
+ type Inner = A;
+ type Reader = A::Reader;
+ type BlockingReader = A::BlockingReader;
+ type Writer = A::Writer;
+ type BlockingWriter = A::BlockingWriter;
+ type Appender = A::Appender;
+ type Pager = A::Pager;
+ type BlockingPager = A::BlockingPager;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ self.inner
+ .read(path, args)
+ .instrument_await(format!("opendal::{}", Operation::Read))
+ .await
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ self.inner
+ .write(path, args)
+ .instrument_await(format!("opendal::{}", Operation::Write))
+ .await
+ }
+
+ async fn append(&self, path: &str, args: OpAppend) -> Result<(RpAppend,
Self::Appender)> {
+ self.inner
+ .append(path, args)
+ .instrument_await(format!("opendal::{}", Operation::Append))
+ .await
+ }
+
+ async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy>
{
+ self.inner()
+ .copy(from, to, args)
+ .instrument_await(format!("opendal::{}", Operation::Copy))
+ .await
+ }
+
+ async fn rename(&self, from: &str, to: &str, args: OpRename) ->
Result<RpRename> {
+ self.inner()
+ .rename(from, to, args)
+ .instrument_await(format!("opendal::{}", Operation::Rename))
+ .await
+ }
+
+ async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+ self.inner
+ .stat(path, args)
+ .instrument_await(format!("opendal::{}", Operation::Stat))
+ .await
+ }
+
+ async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
+ self.inner
+ .delete(path, args)
+ .instrument_await(format!("opendal::{}", Operation::Delete))
+ .await
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Pager)> {
+ self.inner
+ .list(path, args)
+ .instrument_await(format!("opendal::{}", Operation::List))
+ .await
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ self.inner
+ .presign(path, args)
+ .instrument_await(format!("opendal::{}", Operation::Presign))
+ .await
+ }
+
+ async fn batch(&self, args: OpBatch) -> Result<RpBatch> {
+ self.inner
+ .batch(args)
+ .instrument_await(format!("opendal::{}", Operation::Batch))
+ .await
+ }
+
+ fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::BlockingReader)> {
+ self.inner.blocking_read(path, args)
+ }
+
+ fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::BlockingWriter)> {
+ self.inner.blocking_write(path, args)
+ }
+
+ fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::BlockingPager)> {
+ self.inner.blocking_list(path, args)
+ }
+}
diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs
index ab5861a0c..83170390b 100644
--- a/core/src/layers/mod.rs
+++ b/core/src/layers/mod.rs
@@ -84,3 +84,7 @@ mod throttle;
pub use self::oteltrace::OtelTraceLayer;
#[cfg(feature = "layers-throttle")]
pub use self::throttle::ThrottleLayer;
+#[cfg(feature = "layers-await-tree")]
+mod await_tree;
+#[cfg(feature = "layers-await-tree")]
+pub use self::await_tree::AwaitTreeLayer;