This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 027723f16 chore: move from tokio::sync to mea primitives  (#6818)
027723f16 is described below

commit 027723f16286d9a908e0f88b1eb5a04a7f710431
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 18:17:35 2025 +0800

    chore: move from tokio::sync to mea primitives  (#6818)
    
    * chore: move from tokio::sync to mea primitives
    
    Signed-off-by: tison <[email protected]>
    
    * for oncecell
    
    Signed-off-by: tison <[email protected]>
    
    * for notify
    
    Signed-off-by: tison <[email protected]>
    
    * fixup signature
    
    Signed-off-by: tison <[email protected]>
    
    * fixup compile
    
    Signed-off-by: tison <[email protected]>
    
    ---------
    
    Signed-off-by: tison <[email protected]>
---
 bindings/cpp/Cargo.toml                         |  1 +
 bindings/cpp/src/async.rs                       |  6 +--
 bindings/python/Cargo.toml                      |  1 +
 bindings/python/src/file.rs                     |  2 +-
 bindings/python/src/lister.rs                   |  5 ++-
 core/Cargo.lock                                 | 10 +++++
 core/Cargo.toml                                 |  3 +-
 core/src/layers/concurrent_limit.rs             | 49 ++++------------------
 core/src/raw/oio/write/multipart_write.rs       |  2 +-
 core/src/raw/path_cache.rs                      |  4 +-
 core/src/services/aliyun_drive/backend.rs       |  2 +-
 core/src/services/aliyun_drive/core.rs          |  4 +-
 core/src/services/b2/backend.rs                 |  2 +-
 core/src/services/b2/core.rs                    |  2 +-
 core/src/services/dropbox/builder.rs            |  2 +-
 core/src/services/dropbox/core.rs               |  2 +-
 core/src/services/etcd/backend.rs               |  2 +-
 core/src/services/etcd/core.rs                  |  7 ++--
 core/src/services/ftp/backend.rs                |  2 +-
 core/src/services/ftp/core.rs                   |  4 +-
 core/src/services/gdrive/builder.rs             |  2 +-
 core/src/services/gdrive/core.rs                |  2 +-
 core/src/services/gridfs/backend.rs             |  2 +-
 core/src/services/gridfs/core.rs                |  2 +-
 core/src/services/koofr/backend.rs              |  4 +-
 core/src/services/koofr/core.rs                 |  4 +-
 core/src/services/memcached/backend.rs          |  2 +-
 core/src/services/memcached/core.rs             |  4 +-
 core/src/services/mongodb/backend.rs            |  2 +-
 core/src/services/mongodb/core.rs               |  2 +-
 core/src/services/mysql/backend.rs              |  2 +-
 core/src/services/mysql/core.rs                 |  7 ++--
 core/src/services/onedrive/builder.rs           |  2 +-
 core/src/services/onedrive/core.rs              |  2 +-
 core/src/services/postgresql/backend.rs         |  2 +-
 core/src/services/postgresql/core.rs            |  7 ++--
 core/src/services/redis/backend.rs              |  2 +-
 core/src/services/redis/core.rs                 |  2 +-
 core/src/services/seafile/backend.rs            |  2 +-
 core/src/services/seafile/core.rs               |  2 +-
 core/src/services/sftp/backend.rs               |  2 +-
 core/src/services/sftp/core.rs                  |  2 +-
 core/src/services/sqlite/backend.rs             |  4 +-
 core/src/services/sqlite/core.rs                |  7 ++--
 core/src/services/surrealdb/backend.rs          |  2 +-
 core/src/services/surrealdb/core.rs             |  2 +-
 core/src/services/tikv/backend.rs               |  2 +-
 core/src/services/tikv/core.rs                  | 55 ++++++++++++-------------
 core/src/services/webhdfs/backend.rs            |  2 +-
 core/src/services/webhdfs/core.rs               |  2 +-
 core/src/types/context/write.rs                 |  2 +-
 integrations/object_store/Cargo.toml            |  1 +
 integrations/object_store/src/service/writer.rs |  8 +---
 integrations/object_store/src/store.rs          | 23 ++++++-----
 54 files changed, 130 insertions(+), 152 deletions(-)

diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 169234739..41c443734 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -34,6 +34,7 @@ anyhow = { version = "1.0.100" }
 cxx = { version = "1.0.186" }
 cxx-async = { version = "0.1.3", optional = true }
 futures = { version = "0.3.31" }
+mea = { version = "0.5.1" }
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = ["blocking"] }
 tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs
index be461ea51..d0a3d2d5d 100644
--- a/bindings/cpp/src/async.rs
+++ b/bindings/cpp/src/async.rs
@@ -17,6 +17,7 @@
 
 use anyhow::Result;
 use cxx_async::CxxAsyncException;
+use mea::mutex::Mutex;
 use opendal as od;
 use std::collections::HashMap;
 use std::future::Future;
@@ -24,7 +25,6 @@ use std::ops::Deref;
 use std::pin::Pin;
 use std::str::FromStr;
 use std::sync::{Arc, OnceLock};
-use tokio::sync::Mutex;
 
 #[cxx::bridge(namespace = opendal::ffi::async_op)]
 mod ffi {
@@ -321,7 +321,7 @@ unsafe fn lister_next(lister: ffi::ListerPtr) -> 
RustFutureEntryOption {
 
 fn delete_reader(reader: ffi::ReaderPtr) {
     // Use blocking lock since this is called from C++ destructors
-    if let Ok(mut storage) = get_reader_storage().try_lock() {
+    if let Some(mut storage) = get_reader_storage().try_lock() {
         storage.remove(&reader.id);
     }
     // If we can't get the lock immediately, we'll just skip cleanup
@@ -330,7 +330,7 @@ fn delete_reader(reader: ffi::ReaderPtr) {
 
 fn delete_lister(lister: ffi::ListerPtr) {
     // Use blocking lock since this is called from C++ destructors
-    if let Ok(mut storage) = get_lister_storage().try_lock() {
+    if let Some(mut storage) = get_lister_storage().try_lock() {
         storage.remove(&lister.id);
     }
     // If we can't get the lock immediately, we'll just skip cleanup
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index f585ab5fe..7d204be47 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -201,6 +201,7 @@ bytes = "1.5.0"
 dict_derive = "0.6.0"
 futures = "0.3.28"
 jiff = { version = "0.2.15" }
+mea = { version = "0.5.1" }
 # this crate won't be published, we always use the local version
 opendal = { version = ">=0", path = "../../core", features = [
   "blocking",
diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index 6e1d6a861..da744eb39 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -26,13 +26,13 @@ use std::sync::Arc;
 use futures::AsyncReadExt;
 use futures::AsyncSeekExt;
 use futures::AsyncWriteExt;
+use mea::mutex::Mutex;
 use pyo3::IntoPyObjectExt;
 use pyo3::buffer::PyBuffer;
 use pyo3::exceptions::PyIOError;
 use pyo3::exceptions::PyValueError;
 use pyo3::prelude::*;
 use pyo3_async_runtimes::tokio::future_into_py;
-use tokio::sync::Mutex;
 
 use crate::*;
 
diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs
index dfa5193cf..04b77ec58 100644
--- a/bindings/python/src/lister.rs
+++ b/bindings/python/src/lister.rs
@@ -18,10 +18,11 @@
 use std::sync::Arc;
 
 use futures::TryStreamExt;
+use mea::mutex::Mutex;
+use pyo3::IntoPyObjectExt;
 use pyo3::exceptions::PyStopAsyncIteration;
-use pyo3::{IntoPyObjectExt, prelude::*};
+use pyo3::prelude::*;
 use pyo3_async_runtimes::tokio::future_into_py;
-use tokio::sync::Mutex;
 
 use crate::*;
 
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 76d6c1277..7ab3d1485 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -4770,6 +4770,15 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "mea"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c84bb668065d5f9eca80c5d072bb3357fa7e09fb6dbfed2f08c8f99f1749d302"
+dependencies = [
+ "slab",
+]
+
 [[package]]
 name = "memchr"
 version = "2.7.6"
@@ -5372,6 +5381,7 @@ dependencies = [
  "libtest-mimic",
  "log",
  "md-5",
+ "mea",
  "metrics",
  "mime_guess",
  "mini-moka",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 5ad6f706d..16224b93b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -252,6 +252,7 @@ http-body = "1"
 jiff = { version = "0.2.15", features = ["serde"] }
 log = "0.4"
 md-5 = "0.10"
+mea = { version = "0.5.1" }
 percent-encoding = "2"
 quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
 reqwest = { version = "0.12.24", features = [
@@ -259,7 +260,7 @@ reqwest = { version = "0.12.24", features = [
 ], default-features = false }
 serde = { version = "1", features = ["derive"] }
 serde_json = "1"
-tokio = { version = "1.48", features = ["sync", "io-util"] }
+tokio = { version = "1.48", features = ["io-util"] }
 url = "2.5"
 uuid = { version = "1", features = ["serde", "v4"] }
 
diff --git a/core/src/layers/concurrent_limit.rs 
b/core/src/layers/concurrent_limit.rs
index d51f4de45..0f42007ba 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -23,8 +23,8 @@ use std::task::Poll;
 
 use futures::Stream;
 use futures::StreamExt;
-use tokio::sync::OwnedSemaphorePermit;
-use tokio::sync::Semaphore;
+use mea::semaphore::OwnedSemaphorePermit;
+use mea::semaphore::Semaphore;
 
 use crate::raw::*;
 use crate::*;
@@ -139,10 +139,7 @@ impl HttpFetch for ConcurrentLimitHttpFetcher {
             return self.inner.fetch(req).await;
         };
 
-        let permit = semaphore
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = semaphore.acquire_owned(1).await;
 
         let resp = self.inner.fetch(req).await?;
         let (parts, body) = resp.into_parts();
@@ -191,22 +188,13 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn create_dir(&self, path: &str, args: OpCreateDir) -> 
Result<RpCreateDir> {
-        let _permit = self
-            .semaphore
-            .acquire()
-            .await
-            .expect("semaphore must be valid");
+        let _permit = self.semaphore.acquire(1).await;
 
         self.inner.create_dir(path, args).await
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .read(path, args)
@@ -215,12 +203,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .write(path, args)
@@ -229,22 +212,13 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
-        let _permit = self
-            .semaphore
-            .acquire()
-            .await
-            .expect("semaphore must be valid");
+        let _permit = self.semaphore.acquire(1).await;
 
         self.inner.stat(path, args).await
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .delete()
@@ -253,12 +227,7 @@ impl<A: Access> LayeredAccess for 
ConcurrentLimitAccessor<A> {
     }
 
     async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let permit = self
-            .semaphore
-            .clone()
-            .acquire_owned()
-            .await
-            .expect("semaphore must be valid");
+        let permit = self.semaphore.clone().acquire_owned(1).await;
 
         self.inner
             .list(path, args)
diff --git a/core/src/raw/oio/write/multipart_write.rs 
b/core/src/raw/oio/write/multipart_write.rs
index 03bb8d270..403774826 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -309,11 +309,11 @@ where
 mod tests {
     use std::time::Duration;
 
+    use mea::mutex::Mutex;
     use pretty_assertions::assert_eq;
     use rand::Rng;
     use rand::RngCore;
     use rand::thread_rng;
-    use tokio::sync::Mutex;
     use tokio::time::sleep;
     use tokio::time::timeout;
 
diff --git a/core/src/raw/path_cache.rs b/core/src/raw/path_cache.rs
index ef8f51253..1d044f38e 100644
--- a/core/src/raw/path_cache.rs
+++ b/core/src/raw/path_cache.rs
@@ -18,9 +18,9 @@
 use std::collections::VecDeque;
 
 use futures::Future;
+use mea::mutex::Mutex;
+use mea::mutex::MutexGuard;
 use moka::sync::Cache;
-use tokio::sync::Mutex;
-use tokio::sync::MutexGuard;
 
 use crate::raw::*;
 use crate::*;
diff --git a/core/src/services/aliyun_drive/backend.rs 
b/core/src/services/aliyun_drive/backend.rs
index a88927650..27fdb6da7 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -22,7 +22,7 @@ use bytes::Buf;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::ALIYUN_DRIVE_SCHEME;
 use super::config::AliyunDriveConfig;
diff --git a/core/src/services/aliyun_drive/core.rs 
b/core/src/services/aliyun_drive/core.rs
index eb20a7867..05cee528b 100644
--- a/core/src/services/aliyun_drive/core.rs
+++ b/core/src/services/aliyun_drive/core.rs
@@ -22,11 +22,11 @@ use bytes::Buf;
 use http::Method;
 use http::Request;
 use http::Response;
+use http::header;
 use http::header::HeaderValue;
-use http::header::{self};
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde::Serialize;
-use tokio::sync::Mutex;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index b4a7bfc0b..1f78f0ddc 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -22,7 +22,7 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::RwLock;
+use mea::rwlock::RwLock;
 
 use super::B2_SCHEME;
 use super::config::B2Config;
diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs
index 636200a38..b88a20598 100644
--- a/core/src/services/b2/core.rs
+++ b/core/src/services/b2/core.rs
@@ -24,9 +24,9 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
+use mea::rwlock::RwLock;
 use serde::Deserialize;
 use serde::Serialize;
-use tokio::sync::RwLock;
 
 use self::constants::X_BZ_CONTENT_SHA1;
 use self::constants::X_BZ_FILE_NAME;
diff --git a/core/src/services/dropbox/builder.rs 
b/core/src/services/dropbox/builder.rs
index e9e0ec71f..bcda4694e 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -18,7 +18,7 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::DROPBOX_SCHEME;
 use super::backend::DropboxBackend;
diff --git a/core/src/services/dropbox/core.rs 
b/core/src/services/dropbox/core.rs
index 30e61ce35..396c73fe2 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -27,9 +27,9 @@ use http::StatusCode;
 use http::header;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde::Serialize;
-use tokio::sync::Mutex;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/etcd/backend.rs 
b/core/src/services/etcd/backend.rs
index 77002ca33..8a9d916f0 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -21,7 +21,7 @@ use etcd_client::Certificate;
 use etcd_client::ConnectOptions;
 use etcd_client::Identity;
 use etcd_client::TlsOptions;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::ETCD_SCHEME;
 use super::config::EtcdConfig;
diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs
index e21fa9e8e..fcb07a73d 100644
--- a/core/src/services/etcd/core.rs
+++ b/core/src/services/etcd/core.rs
@@ -17,11 +17,12 @@
 
 use std::fmt::Debug;
 
+use bb8::Pool;
 use bb8::PooledConnection;
 use bb8::RunError;
 use etcd_client::Client;
 use etcd_client::ConnectOptions;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use crate::services::etcd::error::format_etcd_error;
 use crate::{Buffer, Error, ErrorKind, Result};
@@ -63,7 +64,7 @@ impl bb8::ManageConnection for Manager {
 pub struct EtcdCore {
     pub endpoints: Vec<String>,
     pub options: ConnectOptions,
-    pub client: OnceCell<bb8::Pool<Manager>>,
+    pub client: OnceCell<Pool<Manager>>,
 }
 
 impl Debug for EtcdCore {
@@ -80,7 +81,7 @@ impl EtcdCore {
         let client = self
             .client
             .get_or_try_init(|| async {
-                bb8::Pool::builder()
+                Pool::builder()
                     .max_size(64)
                     .build(Manager {
                         endpoints: self.endpoints.clone(),
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index af7700ec4..0b0c87b13 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -21,12 +21,12 @@ use std::sync::Arc;
 
 use http::Uri;
 use log::debug;
+use mea::once::OnceCell;
 use services::ftp::core::Manager;
 use suppaftp::FtpError;
 use suppaftp::Status;
 use suppaftp::list::File;
 use suppaftp::types::Response;
-use tokio::sync::OnceCell;
 
 use super::FTP_SCHEME;
 use super::config::FtpConfig;
diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs
index 8232aa107..42d427096 100644
--- a/core/src/services/ftp/core.rs
+++ b/core/src/services/ftp/core.rs
@@ -21,6 +21,7 @@ use bb8::Pool;
 use bb8::PooledConnection;
 use bb8::RunError;
 use futures_rustls::TlsConnector;
+use mea::once::OnceCell;
 use raw::Operation;
 use suppaftp::AsyncRustlsConnector;
 use suppaftp::AsyncRustlsFtpStream;
@@ -29,7 +30,6 @@ use suppaftp::ImplAsyncFtpStream;
 use suppaftp::Status;
 use suppaftp::rustls::ClientConfig;
 use suppaftp::types::FileType;
-use tokio::sync::OnceCell;
 
 use super::err::parse_error;
 use crate::raw::AccessorInfo;
@@ -46,7 +46,7 @@ impl FtpCore {
         let pool = self
             .pool
             .get_or_try_init(|| async {
-                bb8::Pool::builder()
+                Pool::builder()
                     .max_size(64)
                     .build(self.manager.clone())
                     .await
diff --git a/core/src/services/gdrive/builder.rs 
b/core/src/services/gdrive/builder.rs
index df1273beb..5abe7e7ed 100644
--- a/core/src/services/gdrive/builder.rs
+++ b/core/src/services/gdrive/builder.rs
@@ -19,7 +19,7 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::GDRIVE_SCHEME;
 use super::backend::GdriveBackend;
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 7e4f00db3..0b89c14b3 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -25,9 +25,9 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
+use mea::mutex::Mutex;
 use serde::Deserialize;
 use serde_json::json;
-use tokio::sync::Mutex;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/gridfs/backend.rs 
b/core/src/services/gridfs/backend.rs
index 71e3ae875..7519cacb8 100644
--- a/core/src/services/gridfs/backend.rs
+++ b/core/src/services/gridfs/backend.rs
@@ -17,7 +17,7 @@
 
 use std::sync::Arc;
 
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::GRIDFS_SCHEME;
 use super::config::GridfsConfig;
diff --git a/core/src/services/gridfs/core.rs b/core/src/services/gridfs/core.rs
index 2c06e3cc6..cdc2a8ac0 100644
--- a/core/src/services/gridfs/core.rs
+++ b/core/src/services/gridfs/core.rs
@@ -19,11 +19,11 @@ use std::fmt::Debug;
 
 use futures::AsyncReadExt;
 use futures::AsyncWriteExt;
+use mea::once::OnceCell;
 use mongodb::bson::doc;
 use mongodb::gridfs::GridFsBucket;
 use mongodb::options::ClientOptions;
 use mongodb::options::GridFsBucketOptions;
-use tokio::sync::OnceCell;
 
 use crate::raw::*;
 use crate::*;
diff --git a/core/src/services/koofr/backend.rs 
b/core/src/services/koofr/backend.rs
index a887b0b02..6a2481109 100644
--- a/core/src/services/koofr/backend.rs
+++ b/core/src/services/koofr/backend.rs
@@ -22,8 +22,8 @@ use bytes::Buf;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::Mutex;
-use tokio::sync::OnceCell;
+use mea::mutex::Mutex;
+use mea::once::OnceCell;
 
 use super::KOOFR_SCHEME;
 use super::config::KoofrConfig;
diff --git a/core/src/services/koofr/core.rs b/core/src/services/koofr/core.rs
index c9747501a..83c42f163 100644
--- a/core/src/services/koofr/core.rs
+++ b/core/src/services/koofr/core.rs
@@ -26,10 +26,10 @@ use http::Response;
 use http::StatusCode;
 use http::header;
 use http::request;
+use mea::mutex::Mutex;
+use mea::once::OnceCell;
 use serde::Deserialize;
 use serde_json::json;
-use tokio::sync::Mutex;
-use tokio::sync::OnceCell;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/memcached/backend.rs 
b/core/src/services/memcached/backend.rs
index 86d4b1877..22ce701c6 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -18,7 +18,7 @@
 use std::sync::Arc;
 use std::time::Duration;
 
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::MEMCACHED_SCHEME;
 use super::config::MemcachedConfig;
diff --git a/core/src/services/memcached/core.rs 
b/core/src/services/memcached/core.rs
index c95b62ea4..add582f80 100644
--- a/core/src/services/memcached/core.rs
+++ b/core/src/services/memcached/core.rs
@@ -18,8 +18,8 @@
 use std::time::Duration;
 
 use bb8::RunError;
+use mea::once::OnceCell;
 use tokio::net::TcpStream;
-use tokio::sync::OnceCell;
 
 use super::binary;
 use crate::raw::*;
@@ -95,7 +95,7 @@ impl MemcachedCore {
                     .build(mgr)
                     .await
                     .map_err(|err| {
-                        Error::new(ErrorKind::ConfigInvalid, "connect to 
memecached failed")
+                        Error::new(ErrorKind::ConfigInvalid, "connect to 
memcached failed")
                             .set_source(err)
                     })
             })
diff --git a/core/src/services/mongodb/backend.rs 
b/core/src/services/mongodb/backend.rs
index 2068c9a2e..17513fd9f 100644
--- a/core/src/services/mongodb/backend.rs
+++ b/core/src/services/mongodb/backend.rs
@@ -17,7 +17,7 @@
 
 use std::sync::Arc;
 
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::MONGODB_SCHEME;
 use super::config::MongodbConfig;
diff --git a/core/src/services/mongodb/core.rs 
b/core/src/services/mongodb/core.rs
index 820242359..9e8a6ee82 100644
--- a/core/src/services/mongodb/core.rs
+++ b/core/src/services/mongodb/core.rs
@@ -17,11 +17,11 @@
 
 use std::fmt::Debug;
 
+use mea::once::OnceCell;
 use mongodb::bson::Binary;
 use mongodb::bson::Document;
 use mongodb::bson::doc;
 use mongodb::options::ClientOptions;
-use tokio::sync::OnceCell;
 
 use crate::*;
 
diff --git a/core/src/services/mysql/backend.rs 
b/core/src/services/mysql/backend.rs
index bede75b6e..cb6ee4678 100644
--- a/core/src/services/mysql/backend.rs
+++ b/core/src/services/mysql/backend.rs
@@ -18,8 +18,8 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use mea::once::OnceCell;
 use sqlx::mysql::MySqlConnectOptions;
-use tokio::sync::OnceCell;
 
 use super::MYSQL_SCHEME;
 use super::config::MysqlConfig;
diff --git a/core/src/services/mysql/core.rs b/core/src/services/mysql/core.rs
index c491106eb..3a346e9c5 100644
--- a/core/src/services/mysql/core.rs
+++ b/core/src/services/mysql/core.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use mea::once::OnceCell;
 use sqlx::MySqlPool;
 use sqlx::mysql::MySqlConnectOptions;
-use tokio::sync::OnceCell;
 
 use crate::*;
 
@@ -35,10 +35,9 @@ impl MysqlCore {
     async fn get_client(&self) -> Result<&MySqlPool> {
         self.pool
             .get_or_try_init(|| async {
-                let pool = MySqlPool::connect_with(self.config.clone())
+                MySqlPool::connect_with(self.config.clone())
                     .await
-                    .map_err(parse_mysql_error)?;
-                Ok(pool)
+                    .map_err(parse_mysql_error)
             })
             .await
     }
diff --git a/core/src/services/onedrive/builder.rs 
b/core/src/services/onedrive/builder.rs
index 659fa376d..a058d27f1 100644
--- a/core/src/services/onedrive/builder.rs
+++ b/core/src/services/onedrive/builder.rs
@@ -19,9 +19,9 @@ use std::fmt::Debug;
 use std::sync::Arc;
 
 use log::debug;
+use mea::mutex::Mutex;
 use services::onedrive::core::OneDriveCore;
 use services::onedrive::core::OneDriveSigner;
-use tokio::sync::Mutex;
 
 use super::ONEDRIVE_SCHEME;
 use super::backend::OnedriveBackend;
diff --git a/core/src/services/onedrive/core.rs 
b/core/src/services/onedrive/core.rs
index 4ebd93014..1867fbb65 100644
--- a/core/src/services/onedrive/core.rs
+++ b/core/src/services/onedrive/core.rs
@@ -25,7 +25,7 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
 
 use super::error::parse_error;
 use super::graph_model::*;
diff --git a/core/src/services/postgresql/backend.rs 
b/core/src/services/postgresql/backend.rs
index 5f7900748..338f70b6d 100644
--- a/core/src/services/postgresql/backend.rs
+++ b/core/src/services/postgresql/backend.rs
@@ -17,8 +17,8 @@
 
 use std::sync::Arc;
 
+use mea::once::OnceCell;
 use sqlx::postgres::PgConnectOptions;
-use tokio::sync::OnceCell;
 
 use super::POSTGRESQL_SCHEME;
 use super::config::PostgresqlConfig;
diff --git a/core/src/services/postgresql/core.rs 
b/core/src/services/postgresql/core.rs
index b18df9fb7..309fac33a 100644
--- a/core/src/services/postgresql/core.rs
+++ b/core/src/services/postgresql/core.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use mea::once::OnceCell;
 use sqlx::PgPool;
 use sqlx::postgres::PgConnectOptions;
-use tokio::sync::OnceCell;
 
 use crate::*;
 
@@ -35,10 +35,9 @@ impl PostgresqlCore {
     async fn get_client(&self) -> Result<&PgPool> {
         self.pool
             .get_or_try_init(|| async {
-                let pool = PgPool::connect_with(self.config.clone())
+                PgPool::connect_with(self.config.clone())
                     .await
-                    .map_err(parse_postgres_error)?;
-                Ok(pool)
+                    .map_err(parse_postgres_error)
             })
             .await
     }
diff --git a/core/src/services/redis/backend.rs 
b/core/src/services/redis/backend.rs
index 3c4b7619c..5365422cc 100644
--- a/core/src/services/redis/backend.rs
+++ b/core/src/services/redis/backend.rs
@@ -20,13 +20,13 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use http::Uri;
+use mea::once::OnceCell;
 use redis::Client;
 use redis::ConnectionAddr;
 use redis::ConnectionInfo;
 use redis::ProtocolVersion;
 use redis::RedisConnectionInfo;
 use redis::cluster::ClusterClientBuilder;
-use tokio::sync::OnceCell;
 
 use super::REDIS_SCHEME;
 use super::config::RedisConfig;
diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs
index 50afc909d..21f609f51 100644
--- a/core/src/services/redis/core.rs
+++ b/core/src/services/redis/core.rs
@@ -20,6 +20,7 @@ use std::time::Duration;
 
 use bb8::RunError;
 use bytes::Bytes;
+use mea::once::OnceCell;
 use redis::AsyncCommands;
 use redis::Client;
 use redis::Cmd;
@@ -31,7 +32,6 @@ use redis::aio::ConnectionLike;
 use redis::aio::ConnectionManager;
 use redis::cluster::ClusterClient;
 use redis::cluster_async::ClusterConnection;
-use tokio::sync::OnceCell;
 
 use crate::*;
 
diff --git a/core/src/services/seafile/backend.rs 
b/core/src/services/seafile/backend.rs
index 43114a8e8..d0d11de3f 100644
--- a/core/src/services/seafile/backend.rs
+++ b/core/src/services/seafile/backend.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::RwLock;
+use mea::rwlock::RwLock;
 
 use super::SEAFILE_SCHEME;
 use super::config::SeafileConfig;
diff --git a/core/src/services/seafile/core.rs 
b/core/src/services/seafile/core.rs
index 3a3c60e62..e26a1cf70 100644
--- a/core/src/services/seafile/core.rs
+++ b/core/src/services/seafile/core.rs
@@ -24,8 +24,8 @@ use http::Request;
 use http::Response;
 use http::StatusCode;
 use http::header;
+use mea::rwlock::RwLock;
 use serde::Deserialize;
-use tokio::sync::RwLock;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/services/sftp/backend.rs 
b/core/src/services/sftp/backend.rs
index b5758c470..ca2e17a81 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -21,9 +21,9 @@ use std::path::PathBuf;
 use std::sync::Arc;
 
 use log::debug;
+use mea::once::OnceCell;
 use openssh::KnownHosts;
 use tokio::io::AsyncSeekExt;
-use tokio::sync::OnceCell;
 
 use super::SFTP_SCHEME;
 use super::config::SftpConfig;
diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs
index ade9f200c..327cf57af 100644
--- a/core/src/services/sftp/core.rs
+++ b/core/src/services/sftp/core.rs
@@ -23,11 +23,11 @@ use std::sync::Arc;
 use bb8::PooledConnection;
 use bb8::RunError;
 use log::debug;
+use mea::once::OnceCell;
 use openssh::KnownHosts;
 use openssh::SessionBuilder;
 use openssh_sftp_client::Sftp;
 use openssh_sftp_client::SftpOptions;
-use tokio::sync::OnceCell;
 
 use super::error::is_sftp_protocol_error;
 use super::error::parse_sftp_error;
diff --git a/core/src/services/sqlite/backend.rs 
b/core/src/services/sqlite/backend.rs
index 5ed83264f..8b05e765b 100644
--- a/core/src/services/sqlite/backend.rs
+++ b/core/src/services/sqlite/backend.rs
@@ -18,8 +18,8 @@
 use std::str::FromStr;
 use std::sync::Arc;
 
+use mea::once::OnceCell;
 use sqlx::sqlite::SqliteConnectOptions;
-use tokio::sync::OnceCell;
 
 use super::SQLITE_SCHEME;
 use super::config::SqliteConfig;
@@ -319,7 +319,7 @@ mod test {
     async fn build_client() -> OnceCell<SqlitePool> {
         let config = 
SqliteConnectOptions::from_str("sqlite::memory:").unwrap();
         let pool = SqlitePool::connect_with(config).await.unwrap();
-        OnceCell::new_with(Some(pool))
+        OnceCell::from_value(pool)
     }
 
     #[tokio::test]
diff --git a/core/src/services/sqlite/core.rs b/core/src/services/sqlite/core.rs
index 15d4ae2c4..fa55643e9 100644
--- a/core/src/services/sqlite/core.rs
+++ b/core/src/services/sqlite/core.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use mea::once::OnceCell;
 use sqlx::SqlitePool;
 use sqlx::sqlite::SqliteConnectOptions;
 use std::fmt::Debug;
-use tokio::sync::OnceCell;
 
 use crate::services::sqlite::backend::parse_sqlite_error;
 use crate::*;
@@ -37,10 +37,9 @@ impl SqliteCore {
     pub async fn get_client(&self) -> Result<&SqlitePool> {
         self.pool
             .get_or_try_init(|| async {
-                let pool = SqlitePool::connect_with(self.config.clone())
+                SqlitePool::connect_with(self.config.clone())
                     .await
-                    .map_err(parse_sqlite_error)?;
-                Ok(pool)
+                    .map_err(parse_sqlite_error)
             })
             .await
     }
diff --git a/core/src/services/surrealdb/backend.rs 
b/core/src/services/surrealdb/backend.rs
index d7a8b30ad..15f0a1595 100644
--- a/core/src/services/surrealdb/backend.rs
+++ b/core/src/services/surrealdb/backend.rs
@@ -18,7 +18,7 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::SURREALDB_SCHEME;
 use super::config::SurrealdbConfig;
diff --git a/core/src/services/surrealdb/core.rs 
b/core/src/services/surrealdb/core.rs
index 5e7755b23..2b543e711 100644
--- a/core/src/services/surrealdb/core.rs
+++ b/core/src/services/surrealdb/core.rs
@@ -18,10 +18,10 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
+use mea::once::OnceCell;
 use surrealdb::Surreal;
 use surrealdb::engine::any::Any;
 use surrealdb::opt::auth::Database;
-use tokio::sync::OnceCell;
 
 use crate::*;
 
diff --git a/core/src/services/tikv/backend.rs 
b/core/src/services/tikv/backend.rs
index 681e357f8..ca5c77b26 100644
--- a/core/src/services/tikv/backend.rs
+++ b/core/src/services/tikv/backend.rs
@@ -18,7 +18,7 @@
 use std::fmt::Debug;
 use std::sync::Arc;
 
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::TIKV_SCHEME;
 use super::config::TikvConfig;
diff --git a/core/src/services/tikv/core.rs b/core/src/services/tikv/core.rs
index 55ca8539c..363bf665e 100644
--- a/core/src/services/tikv/core.rs
+++ b/core/src/services/tikv/core.rs
@@ -17,9 +17,9 @@
 
 use std::fmt::Debug;
 
+use mea::once::OnceCell;
 use tikv_client::Config;
 use tikv_client::RawClient;
-use tokio::sync::OnceCell;
 
 use super::TIKV_SCHEME;
 use crate::*;
@@ -45,33 +45,32 @@ impl Debug for TikvCore {
 }
 
 impl TikvCore {
-    async fn get_connection(&self) -> Result<RawClient> {
-        if let Some(client) = self.client.get() {
-            return Ok(client.clone());
-        }
-        let client = if self.insecure {
-            RawClient::new(self.endpoints.clone())
-                .await
-                .map_err(parse_tikv_config_error)?
-        } else if self.ca_path.is_some() && self.key_path.is_some() && 
self.cert_path.is_some() {
-            let (ca_path, key_path, cert_path) = (
-                self.ca_path.clone().unwrap(),
-                self.key_path.clone().unwrap(),
-                self.cert_path.clone().unwrap(),
-            );
-            let config = Config::default().with_security(ca_path, cert_path, 
key_path);
-            RawClient::new_with_config(self.endpoints.clone(), config)
-                .await
-                .map_err(parse_tikv_config_error)?
-        } else {
-            return Err(
-                Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
-                    .with_context("service", TIKV_SCHEME)
-                    .with_context("endpoints", format!("{:?}", 
self.endpoints)),
-            );
-        };
-        self.client.set(client.clone()).ok();
-        Ok(client)
+    async fn get_connection(&self) -> Result<&RawClient> {
+        self.client
+            .get_or_try_init(|| async {
+                if self.insecure {
+                    return RawClient::new(self.endpoints.clone())
+                        .await
+                        .map_err(parse_tikv_config_error);
+                }
+
+                if let Some(ca_path) = self.ca_path.as_ref()
+                    && let Some(key_path) = self.key_path.as_ref()
+                    && let Some(cert_path) = self.cert_path.as_ref()
+                {
+                    let config = Config::default().with_security(ca_path, 
cert_path, key_path);
+                    return RawClient::new_with_config(self.endpoints.clone(), 
config)
+                        .await
+                        .map_err(parse_tikv_config_error);
+                }
+
+                Err(
+                    Error::new(ErrorKind::ConfigInvalid, "invalid 
configuration")
+                        .with_context("service", TIKV_SCHEME)
+                        .with_context("endpoints", format!("{:?}", 
self.endpoints)),
+                )
+            })
+            .await
     }
 
     pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
diff --git a/core/src/services/webhdfs/backend.rs 
b/core/src/services/webhdfs/backend.rs
index 8913d0666..b0043cd6f 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -22,7 +22,7 @@ use bytes::Buf;
 use http::Response;
 use http::StatusCode;
 use log::debug;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
 
 use super::WEBHDFS_SCHEME;
 use super::config::WebhdfsConfig;
diff --git a/core/src/services/webhdfs/core.rs 
b/core/src/services/webhdfs/core.rs
index a37725bee..4fef4886b 100644
--- a/core/src/services/webhdfs/core.rs
+++ b/core/src/services/webhdfs/core.rs
@@ -24,8 +24,8 @@ use http::Response;
 use http::StatusCode;
 use http::header::CONTENT_LENGTH;
 use http::header::CONTENT_TYPE;
+use mea::once::OnceCell;
 use serde::Deserialize;
-use tokio::sync::OnceCell;
 
 use super::error::parse_error;
 use crate::raw::*;
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index b16799f38..cc6c7720c 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -210,13 +210,13 @@ mod tests {
     use bytes::BufMut;
     use bytes::Bytes;
     use log::debug;
+    use mea::mutex::Mutex;
     use pretty_assertions::assert_eq;
     use rand::Rng;
     use rand::RngCore;
     use rand::thread_rng;
     use sha2::Digest;
     use sha2::Sha256;
-    use tokio::sync::Mutex;
 
     use super::*;
     use crate::raw::oio::Write;
diff --git a/integrations/object_store/Cargo.toml 
b/integrations/object_store/Cargo.toml
index 534f17310..00dbf5bcc 100644
--- a/integrations/object_store/Cargo.toml
+++ b/integrations/object_store/Cargo.toml
@@ -41,6 +41,7 @@ async-trait = "0.1"
 bytes = "1"
 chrono = { version = "0.4.42", features = ["std", "clock"] }
 futures = "0.3"
+mea = { version = "0.5.0 "}
 object_store = "0.12.3"
 opendal = { version = "0.55.0", path = "../../core", default-features = false }
 pin-project = "1.1"
diff --git a/integrations/object_store/src/service/writer.rs 
b/integrations/object_store/src/service/writer.rs
index 5dc9ec809..9b6c07dc6 100644
--- a/integrations/object_store/src/service/writer.rs
+++ b/integrations/object_store/src/service/writer.rs
@@ -23,10 +23,10 @@ use object_store::PutPayload;
 use object_store::path::Path as ObjectStorePath;
 use object_store::{Attribute, AttributeValue};
 
+use mea::mutex::Mutex;
 use opendal::raw::oio::MultipartPart;
 use opendal::raw::*;
 use opendal::*;
-use tokio::sync::Mutex;
 
 use super::core::{format_put_multipart_options, format_put_result, 
parse_op_write};
 use super::error::parse_error;
@@ -160,11 +160,7 @@ impl oio::MultipartWrite for ObjectStoreWriter {
         Ok(multipart_part)
     }
 
-    async fn complete_part(
-        &self,
-        _upload_id: &str,
-        parts: &[oio::MultipartPart],
-    ) -> Result<Metadata> {
+    async fn complete_part(&self, _upload_id: &str, parts: &[MultipartPart]) 
-> Result<Metadata> {
         // Validate that we have parts to complete
         if parts.is_empty() {
             return Err(Error::new(
diff --git a/integrations/object_store/src/store.rs 
b/integrations/object_store/src/store.rs
index 177ce72d5..08a377c18 100644
--- a/integrations/object_store/src/store.rs
+++ b/integrations/object_store/src/store.rs
@@ -27,6 +27,8 @@ use futures::FutureExt;
 use futures::StreamExt;
 use futures::TryStreamExt;
 use futures::stream::BoxStream;
+use mea::mutex::Mutex;
+use mea::oneshot;
 use object_store::ListResult;
 use object_store::MultipartUpload;
 use object_store::ObjectMeta;
@@ -45,7 +47,6 @@ use opendal::options::CopyOptions;
 use opendal::raw::percent_decode_path;
 use opendal::{Operator, OperatorInfo};
 use std::collections::HashMap;
-use tokio::sync::{Mutex, Notify};
 
 /// OpendalStore implements ObjectStore trait by using opendal.
 ///
@@ -608,15 +609,18 @@ impl ObjectStore for OpendalStore {
 struct OpendalMultipartUpload {
     writer: Arc<Mutex<Writer>>,
     location: Path,
-    next_notify: Option<Arc<Notify>>,
+    next_notify: oneshot::Receiver<()>,
 }
 
 impl OpendalMultipartUpload {
     fn new(writer: Writer, location: Path) -> Self {
+        // an immediately dropped sender for the first part to write without 
waiting
+        let (_, rx) = oneshot::channel();
+
         Self {
             writer: Arc::new(Mutex::new(writer)),
             location,
-            next_notify: None,
+            next_notify: rx,
         }
     }
 }
@@ -628,16 +632,13 @@ impl MultipartUpload for OpendalMultipartUpload {
         let location = self.location.clone();
 
         // Generate next notify which will be notified after the current part 
is written.
-        let next_notify = Arc::new(Notify::new());
+        let (tx, rx) = oneshot::channel();
         // Fetch the notify for current part to wait for it to be written.
-        let current_notify = self.next_notify.replace(next_notify.clone());
+        let last_rx = std::mem::replace(&mut self.next_notify, rx);
 
         async move {
-            // current_notify == None means that it's the first part, we don't 
need to wait.
-            if let Some(notify) = current_notify {
-                // Wait for the previous part to be written
-                notify.notified().await;
-            }
+            // Wait for the previous part to be written
+            let _ = last_rx.await;
 
             let mut writer = writer.lock().await;
             let result = writer
@@ -646,7 +647,7 @@ impl MultipartUpload for OpendalMultipartUpload {
                 .map_err(|err| format_object_store_error(err, 
location.as_ref()));
 
             // Notify the next part to be written
-            next_notify.notify_one();
+            drop(tx);
 
             result
         }


Reply via email to