This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 027723f16 chore: move from tokio::sync to mea primitives (#6818)
027723f16 is described below
commit 027723f16286d9a908e0f88b1eb5a04a7f710431
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 18:17:35 2025 +0800
chore: move from tokio::sync to mea primitives (#6818)
* chore: move from tokio::sync to mea primitives
Signed-off-by: tison <[email protected]>
* for oncecell
Signed-off-by: tison <[email protected]>
* for notify
Signed-off-by: tison <[email protected]>
* fixup signature
Signed-off-by: tison <[email protected]>
* fixup compile
Signed-off-by: tison <[email protected]>
---------
Signed-off-by: tison <[email protected]>
---
bindings/cpp/Cargo.toml | 1 +
bindings/cpp/src/async.rs | 6 +--
bindings/python/Cargo.toml | 1 +
bindings/python/src/file.rs | 2 +-
bindings/python/src/lister.rs | 5 ++-
core/Cargo.lock | 10 +++++
core/Cargo.toml | 3 +-
core/src/layers/concurrent_limit.rs | 49 ++++------------------
core/src/raw/oio/write/multipart_write.rs | 2 +-
core/src/raw/path_cache.rs | 4 +-
core/src/services/aliyun_drive/backend.rs | 2 +-
core/src/services/aliyun_drive/core.rs | 4 +-
core/src/services/b2/backend.rs | 2 +-
core/src/services/b2/core.rs | 2 +-
core/src/services/dropbox/builder.rs | 2 +-
core/src/services/dropbox/core.rs | 2 +-
core/src/services/etcd/backend.rs | 2 +-
core/src/services/etcd/core.rs | 7 ++--
core/src/services/ftp/backend.rs | 2 +-
core/src/services/ftp/core.rs | 4 +-
core/src/services/gdrive/builder.rs | 2 +-
core/src/services/gdrive/core.rs | 2 +-
core/src/services/gridfs/backend.rs | 2 +-
core/src/services/gridfs/core.rs | 2 +-
core/src/services/koofr/backend.rs | 4 +-
core/src/services/koofr/core.rs | 4 +-
core/src/services/memcached/backend.rs | 2 +-
core/src/services/memcached/core.rs | 4 +-
core/src/services/mongodb/backend.rs | 2 +-
core/src/services/mongodb/core.rs | 2 +-
core/src/services/mysql/backend.rs | 2 +-
core/src/services/mysql/core.rs | 7 ++--
core/src/services/onedrive/builder.rs | 2 +-
core/src/services/onedrive/core.rs | 2 +-
core/src/services/postgresql/backend.rs | 2 +-
core/src/services/postgresql/core.rs | 7 ++--
core/src/services/redis/backend.rs | 2 +-
core/src/services/redis/core.rs | 2 +-
core/src/services/seafile/backend.rs | 2 +-
core/src/services/seafile/core.rs | 2 +-
core/src/services/sftp/backend.rs | 2 +-
core/src/services/sftp/core.rs | 2 +-
core/src/services/sqlite/backend.rs | 4 +-
core/src/services/sqlite/core.rs | 7 ++--
core/src/services/surrealdb/backend.rs | 2 +-
core/src/services/surrealdb/core.rs | 2 +-
core/src/services/tikv/backend.rs | 2 +-
core/src/services/tikv/core.rs | 55 ++++++++++++-------------
core/src/services/webhdfs/backend.rs | 2 +-
core/src/services/webhdfs/core.rs | 2 +-
core/src/types/context/write.rs | 2 +-
integrations/object_store/Cargo.toml | 1 +
integrations/object_store/src/service/writer.rs | 8 +---
integrations/object_store/src/store.rs | 23 ++++++-----
54 files changed, 130 insertions(+), 152 deletions(-)
diff --git a/bindings/cpp/Cargo.toml b/bindings/cpp/Cargo.toml
index 169234739..41c443734 100644
--- a/bindings/cpp/Cargo.toml
+++ b/bindings/cpp/Cargo.toml
@@ -34,6 +34,7 @@ anyhow = { version = "1.0.100" }
cxx = { version = "1.0.186" }
cxx-async = { version = "0.1.3", optional = true }
futures = { version = "0.3.31" }
+mea = { version = "0.5.1" }
# this crate won't be published, we always use the local version
opendal = { version = ">=0", path = "../../core", features = ["blocking"] }
tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
diff --git a/bindings/cpp/src/async.rs b/bindings/cpp/src/async.rs
index be461ea51..d0a3d2d5d 100644
--- a/bindings/cpp/src/async.rs
+++ b/bindings/cpp/src/async.rs
@@ -17,6 +17,7 @@
use anyhow::Result;
use cxx_async::CxxAsyncException;
+use mea::mutex::Mutex;
use opendal as od;
use std::collections::HashMap;
use std::future::Future;
@@ -24,7 +25,6 @@ use std::ops::Deref;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, OnceLock};
-use tokio::sync::Mutex;
#[cxx::bridge(namespace = opendal::ffi::async_op)]
mod ffi {
@@ -321,7 +321,7 @@ unsafe fn lister_next(lister: ffi::ListerPtr) ->
RustFutureEntryOption {
fn delete_reader(reader: ffi::ReaderPtr) {
// Use blocking lock since this is called from C++ destructors
- if let Ok(mut storage) = get_reader_storage().try_lock() {
+ if let Some(mut storage) = get_reader_storage().try_lock() {
storage.remove(&reader.id);
}
// If we can't get the lock immediately, we'll just skip cleanup
@@ -330,7 +330,7 @@ fn delete_reader(reader: ffi::ReaderPtr) {
fn delete_lister(lister: ffi::ListerPtr) {
// Use blocking lock since this is called from C++ destructors
- if let Ok(mut storage) = get_lister_storage().try_lock() {
+ if let Some(mut storage) = get_lister_storage().try_lock() {
storage.remove(&lister.id);
}
// If we can't get the lock immediately, we'll just skip cleanup
diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml
index f585ab5fe..7d204be47 100644
--- a/bindings/python/Cargo.toml
+++ b/bindings/python/Cargo.toml
@@ -201,6 +201,7 @@ bytes = "1.5.0"
dict_derive = "0.6.0"
futures = "0.3.28"
jiff = { version = "0.2.15" }
+mea = { version = "0.5.1" }
# this crate won't be published, we always use the local version
opendal = { version = ">=0", path = "../../core", features = [
"blocking",
diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index 6e1d6a861..da744eb39 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -26,13 +26,13 @@ use std::sync::Arc;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
use futures::AsyncWriteExt;
+use mea::mutex::Mutex;
use pyo3::IntoPyObjectExt;
use pyo3::buffer::PyBuffer;
use pyo3::exceptions::PyIOError;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::future_into_py;
-use tokio::sync::Mutex;
use crate::*;
diff --git a/bindings/python/src/lister.rs b/bindings/python/src/lister.rs
index dfa5193cf..04b77ec58 100644
--- a/bindings/python/src/lister.rs
+++ b/bindings/python/src/lister.rs
@@ -18,10 +18,11 @@
use std::sync::Arc;
use futures::TryStreamExt;
+use mea::mutex::Mutex;
+use pyo3::IntoPyObjectExt;
use pyo3::exceptions::PyStopAsyncIteration;
-use pyo3::{IntoPyObjectExt, prelude::*};
+use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::future_into_py;
-use tokio::sync::Mutex;
use crate::*;
diff --git a/core/Cargo.lock b/core/Cargo.lock
index 76d6c1277..7ab3d1485 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -4770,6 +4770,15 @@ dependencies = [
"digest",
]
+[[package]]
+name = "mea"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c84bb668065d5f9eca80c5d072bb3357fa7e09fb6dbfed2f08c8f99f1749d302"
+dependencies = [
+ "slab",
+]
+
[[package]]
name = "memchr"
version = "2.7.6"
@@ -5372,6 +5381,7 @@ dependencies = [
"libtest-mimic",
"log",
"md-5",
+ "mea",
"metrics",
"mime_guess",
"mini-moka",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 5ad6f706d..16224b93b 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -252,6 +252,7 @@ http-body = "1"
jiff = { version = "0.2.15", features = ["serde"] }
log = "0.4"
md-5 = "0.10"
+mea = { version = "0.5.1" }
percent-encoding = "2"
quick-xml = { version = "0.38", features = ["serialize", "overlapped-lists"] }
reqwest = { version = "0.12.24", features = [
@@ -259,7 +260,7 @@ reqwest = { version = "0.12.24", features = [
], default-features = false }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
-tokio = { version = "1.48", features = ["sync", "io-util"] }
+tokio = { version = "1.48", features = ["io-util"] }
url = "2.5"
uuid = { version = "1", features = ["serde", "v4"] }
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index d51f4de45..0f42007ba 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -23,8 +23,8 @@ use std::task::Poll;
use futures::Stream;
use futures::StreamExt;
-use tokio::sync::OwnedSemaphorePermit;
-use tokio::sync::Semaphore;
+use mea::semaphore::OwnedSemaphorePermit;
+use mea::semaphore::Semaphore;
use crate::raw::*;
use crate::*;
@@ -139,10 +139,7 @@ impl HttpFetch for ConcurrentLimitHttpFetcher {
return self.inner.fetch(req).await;
};
- let permit = semaphore
- .acquire_owned()
- .await
- .expect("semaphore must be valid");
+ let permit = semaphore.acquire_owned(1).await;
let resp = self.inner.fetch(req).await?;
let (parts, body) = resp.into_parts();
@@ -191,22 +188,13 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn create_dir(&self, path: &str, args: OpCreateDir) ->
Result<RpCreateDir> {
- let _permit = self
- .semaphore
- .acquire()
- .await
- .expect("semaphore must be valid");
+ let _permit = self.semaphore.acquire(1).await;
self.inner.create_dir(path, args).await
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let permit = self
- .semaphore
- .clone()
- .acquire_owned()
- .await
- .expect("semaphore must be valid");
+ let permit = self.semaphore.clone().acquire_owned(1).await;
self.inner
.read(path, args)
@@ -215,12 +203,7 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let permit = self
- .semaphore
- .clone()
- .acquire_owned()
- .await
- .expect("semaphore must be valid");
+ let permit = self.semaphore.clone().acquire_owned(1).await;
self.inner
.write(path, args)
@@ -229,22 +212,13 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
- let _permit = self
- .semaphore
- .acquire()
- .await
- .expect("semaphore must be valid");
+ let _permit = self.semaphore.acquire(1).await;
self.inner.stat(path, args).await
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
- let permit = self
- .semaphore
- .clone()
- .acquire_owned()
- .await
- .expect("semaphore must be valid");
+ let permit = self.semaphore.clone().acquire_owned(1).await;
self.inner
.delete()
@@ -253,12 +227,7 @@ impl<A: Access> LayeredAccess for
ConcurrentLimitAccessor<A> {
}
async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
- let permit = self
- .semaphore
- .clone()
- .acquire_owned()
- .await
- .expect("semaphore must be valid");
+ let permit = self.semaphore.clone().acquire_owned(1).await;
self.inner
.list(path, args)
diff --git a/core/src/raw/oio/write/multipart_write.rs
b/core/src/raw/oio/write/multipart_write.rs
index 03bb8d270..403774826 100644
--- a/core/src/raw/oio/write/multipart_write.rs
+++ b/core/src/raw/oio/write/multipart_write.rs
@@ -309,11 +309,11 @@ where
mod tests {
use std::time::Duration;
+ use mea::mutex::Mutex;
use pretty_assertions::assert_eq;
use rand::Rng;
use rand::RngCore;
use rand::thread_rng;
- use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio::time::timeout;
diff --git a/core/src/raw/path_cache.rs b/core/src/raw/path_cache.rs
index ef8f51253..1d044f38e 100644
--- a/core/src/raw/path_cache.rs
+++ b/core/src/raw/path_cache.rs
@@ -18,9 +18,9 @@
use std::collections::VecDeque;
use futures::Future;
+use mea::mutex::Mutex;
+use mea::mutex::MutexGuard;
use moka::sync::Cache;
-use tokio::sync::Mutex;
-use tokio::sync::MutexGuard;
use crate::raw::*;
use crate::*;
diff --git a/core/src/services/aliyun_drive/backend.rs
b/core/src/services/aliyun_drive/backend.rs
index a88927650..27fdb6da7 100644
--- a/core/src/services/aliyun_drive/backend.rs
+++ b/core/src/services/aliyun_drive/backend.rs
@@ -22,7 +22,7 @@ use bytes::Buf;
use http::Response;
use http::StatusCode;
use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
use super::ALIYUN_DRIVE_SCHEME;
use super::config::AliyunDriveConfig;
diff --git a/core/src/services/aliyun_drive/core.rs
b/core/src/services/aliyun_drive/core.rs
index eb20a7867..05cee528b 100644
--- a/core/src/services/aliyun_drive/core.rs
+++ b/core/src/services/aliyun_drive/core.rs
@@ -22,11 +22,11 @@ use bytes::Buf;
use http::Method;
use http::Request;
use http::Response;
+use http::header;
use http::header::HeaderValue;
-use http::header::{self};
+use mea::mutex::Mutex;
use serde::Deserialize;
use serde::Serialize;
-use tokio::sync::Mutex;
use super::error::parse_error;
use crate::raw::*;
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
index b4a7bfc0b..1f78f0ddc 100644
--- a/core/src/services/b2/backend.rs
+++ b/core/src/services/b2/backend.rs
@@ -22,7 +22,7 @@ use http::Request;
use http::Response;
use http::StatusCode;
use log::debug;
-use tokio::sync::RwLock;
+use mea::rwlock::RwLock;
use super::B2_SCHEME;
use super::config::B2Config;
diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs
index 636200a38..b88a20598 100644
--- a/core/src/services/b2/core.rs
+++ b/core/src/services/b2/core.rs
@@ -24,9 +24,9 @@ use http::Request;
use http::Response;
use http::StatusCode;
use http::header;
+use mea::rwlock::RwLock;
use serde::Deserialize;
use serde::Serialize;
-use tokio::sync::RwLock;
use self::constants::X_BZ_CONTENT_SHA1;
use self::constants::X_BZ_FILE_NAME;
diff --git a/core/src/services/dropbox/builder.rs
b/core/src/services/dropbox/builder.rs
index e9e0ec71f..bcda4694e 100644
--- a/core/src/services/dropbox/builder.rs
+++ b/core/src/services/dropbox/builder.rs
@@ -18,7 +18,7 @@
use std::fmt::Debug;
use std::sync::Arc;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
use super::DROPBOX_SCHEME;
use super::backend::DropboxBackend;
diff --git a/core/src/services/dropbox/core.rs
b/core/src/services/dropbox/core.rs
index 30e61ce35..396c73fe2 100644
--- a/core/src/services/dropbox/core.rs
+++ b/core/src/services/dropbox/core.rs
@@ -27,9 +27,9 @@ use http::StatusCode;
use http::header;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
+use mea::mutex::Mutex;
use serde::Deserialize;
use serde::Serialize;
-use tokio::sync::Mutex;
use super::error::parse_error;
use crate::raw::*;
diff --git a/core/src/services/etcd/backend.rs
b/core/src/services/etcd/backend.rs
index 77002ca33..8a9d916f0 100644
--- a/core/src/services/etcd/backend.rs
+++ b/core/src/services/etcd/backend.rs
@@ -21,7 +21,7 @@ use etcd_client::Certificate;
use etcd_client::ConnectOptions;
use etcd_client::Identity;
use etcd_client::TlsOptions;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::ETCD_SCHEME;
use super::config::EtcdConfig;
diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs
index e21fa9e8e..fcb07a73d 100644
--- a/core/src/services/etcd/core.rs
+++ b/core/src/services/etcd/core.rs
@@ -17,11 +17,12 @@
use std::fmt::Debug;
+use bb8::Pool;
use bb8::PooledConnection;
use bb8::RunError;
use etcd_client::Client;
use etcd_client::ConnectOptions;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use crate::services::etcd::error::format_etcd_error;
use crate::{Buffer, Error, ErrorKind, Result};
@@ -63,7 +64,7 @@ impl bb8::ManageConnection for Manager {
pub struct EtcdCore {
pub endpoints: Vec<String>,
pub options: ConnectOptions,
- pub client: OnceCell<bb8::Pool<Manager>>,
+ pub client: OnceCell<Pool<Manager>>,
}
impl Debug for EtcdCore {
@@ -80,7 +81,7 @@ impl EtcdCore {
let client = self
.client
.get_or_try_init(|| async {
- bb8::Pool::builder()
+ Pool::builder()
.max_size(64)
.build(Manager {
endpoints: self.endpoints.clone(),
diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs
index af7700ec4..0b0c87b13 100644
--- a/core/src/services/ftp/backend.rs
+++ b/core/src/services/ftp/backend.rs
@@ -21,12 +21,12 @@ use std::sync::Arc;
use http::Uri;
use log::debug;
+use mea::once::OnceCell;
use services::ftp::core::Manager;
use suppaftp::FtpError;
use suppaftp::Status;
use suppaftp::list::File;
use suppaftp::types::Response;
-use tokio::sync::OnceCell;
use super::FTP_SCHEME;
use super::config::FtpConfig;
diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs
index 8232aa107..42d427096 100644
--- a/core/src/services/ftp/core.rs
+++ b/core/src/services/ftp/core.rs
@@ -21,6 +21,7 @@ use bb8::Pool;
use bb8::PooledConnection;
use bb8::RunError;
use futures_rustls::TlsConnector;
+use mea::once::OnceCell;
use raw::Operation;
use suppaftp::AsyncRustlsConnector;
use suppaftp::AsyncRustlsFtpStream;
@@ -29,7 +30,6 @@ use suppaftp::ImplAsyncFtpStream;
use suppaftp::Status;
use suppaftp::rustls::ClientConfig;
use suppaftp::types::FileType;
-use tokio::sync::OnceCell;
use super::err::parse_error;
use crate::raw::AccessorInfo;
@@ -46,7 +46,7 @@ impl FtpCore {
let pool = self
.pool
.get_or_try_init(|| async {
- bb8::Pool::builder()
+ Pool::builder()
.max_size(64)
.build(self.manager.clone())
.await
diff --git a/core/src/services/gdrive/builder.rs
b/core/src/services/gdrive/builder.rs
index df1273beb..5abe7e7ed 100644
--- a/core/src/services/gdrive/builder.rs
+++ b/core/src/services/gdrive/builder.rs
@@ -19,7 +19,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use log::debug;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
use super::GDRIVE_SCHEME;
use super::backend::GdriveBackend;
diff --git a/core/src/services/gdrive/core.rs b/core/src/services/gdrive/core.rs
index 7e4f00db3..0b89c14b3 100644
--- a/core/src/services/gdrive/core.rs
+++ b/core/src/services/gdrive/core.rs
@@ -25,9 +25,9 @@ use http::Request;
use http::Response;
use http::StatusCode;
use http::header;
+use mea::mutex::Mutex;
use serde::Deserialize;
use serde_json::json;
-use tokio::sync::Mutex;
use super::error::parse_error;
use crate::raw::*;
diff --git a/core/src/services/gridfs/backend.rs
b/core/src/services/gridfs/backend.rs
index 71e3ae875..7519cacb8 100644
--- a/core/src/services/gridfs/backend.rs
+++ b/core/src/services/gridfs/backend.rs
@@ -17,7 +17,7 @@
use std::sync::Arc;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::GRIDFS_SCHEME;
use super::config::GridfsConfig;
diff --git a/core/src/services/gridfs/core.rs b/core/src/services/gridfs/core.rs
index 2c06e3cc6..cdc2a8ac0 100644
--- a/core/src/services/gridfs/core.rs
+++ b/core/src/services/gridfs/core.rs
@@ -19,11 +19,11 @@ use std::fmt::Debug;
use futures::AsyncReadExt;
use futures::AsyncWriteExt;
+use mea::once::OnceCell;
use mongodb::bson::doc;
use mongodb::gridfs::GridFsBucket;
use mongodb::options::ClientOptions;
use mongodb::options::GridFsBucketOptions;
-use tokio::sync::OnceCell;
use crate::raw::*;
use crate::*;
diff --git a/core/src/services/koofr/backend.rs
b/core/src/services/koofr/backend.rs
index a887b0b02..6a2481109 100644
--- a/core/src/services/koofr/backend.rs
+++ b/core/src/services/koofr/backend.rs
@@ -22,8 +22,8 @@ use bytes::Buf;
use http::Response;
use http::StatusCode;
use log::debug;
-use tokio::sync::Mutex;
-use tokio::sync::OnceCell;
+use mea::mutex::Mutex;
+use mea::once::OnceCell;
use super::KOOFR_SCHEME;
use super::config::KoofrConfig;
diff --git a/core/src/services/koofr/core.rs b/core/src/services/koofr/core.rs
index c9747501a..83c42f163 100644
--- a/core/src/services/koofr/core.rs
+++ b/core/src/services/koofr/core.rs
@@ -26,10 +26,10 @@ use http::Response;
use http::StatusCode;
use http::header;
use http::request;
+use mea::mutex::Mutex;
+use mea::once::OnceCell;
use serde::Deserialize;
use serde_json::json;
-use tokio::sync::Mutex;
-use tokio::sync::OnceCell;
use super::error::parse_error;
use crate::raw::*;
diff --git a/core/src/services/memcached/backend.rs
b/core/src/services/memcached/backend.rs
index 86d4b1877..22ce701c6 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -18,7 +18,7 @@
use std::sync::Arc;
use std::time::Duration;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::MEMCACHED_SCHEME;
use super::config::MemcachedConfig;
diff --git a/core/src/services/memcached/core.rs
b/core/src/services/memcached/core.rs
index c95b62ea4..add582f80 100644
--- a/core/src/services/memcached/core.rs
+++ b/core/src/services/memcached/core.rs
@@ -18,8 +18,8 @@
use std::time::Duration;
use bb8::RunError;
+use mea::once::OnceCell;
use tokio::net::TcpStream;
-use tokio::sync::OnceCell;
use super::binary;
use crate::raw::*;
@@ -95,7 +95,7 @@ impl MemcachedCore {
.build(mgr)
.await
.map_err(|err| {
- Error::new(ErrorKind::ConfigInvalid, "connect to
memecached failed")
+ Error::new(ErrorKind::ConfigInvalid, "connect to
memcached failed")
.set_source(err)
})
})
diff --git a/core/src/services/mongodb/backend.rs
b/core/src/services/mongodb/backend.rs
index 2068c9a2e..17513fd9f 100644
--- a/core/src/services/mongodb/backend.rs
+++ b/core/src/services/mongodb/backend.rs
@@ -17,7 +17,7 @@
use std::sync::Arc;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::MONGODB_SCHEME;
use super::config::MongodbConfig;
diff --git a/core/src/services/mongodb/core.rs
b/core/src/services/mongodb/core.rs
index 820242359..9e8a6ee82 100644
--- a/core/src/services/mongodb/core.rs
+++ b/core/src/services/mongodb/core.rs
@@ -17,11 +17,11 @@
use std::fmt::Debug;
+use mea::once::OnceCell;
use mongodb::bson::Binary;
use mongodb::bson::Document;
use mongodb::bson::doc;
use mongodb::options::ClientOptions;
-use tokio::sync::OnceCell;
use crate::*;
diff --git a/core/src/services/mysql/backend.rs
b/core/src/services/mysql/backend.rs
index bede75b6e..cb6ee4678 100644
--- a/core/src/services/mysql/backend.rs
+++ b/core/src/services/mysql/backend.rs
@@ -18,8 +18,8 @@
use std::fmt::Debug;
use std::sync::Arc;
+use mea::once::OnceCell;
use sqlx::mysql::MySqlConnectOptions;
-use tokio::sync::OnceCell;
use super::MYSQL_SCHEME;
use super::config::MysqlConfig;
diff --git a/core/src/services/mysql/core.rs b/core/src/services/mysql/core.rs
index c491106eb..3a346e9c5 100644
--- a/core/src/services/mysql/core.rs
+++ b/core/src/services/mysql/core.rs
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use mea::once::OnceCell;
use sqlx::MySqlPool;
use sqlx::mysql::MySqlConnectOptions;
-use tokio::sync::OnceCell;
use crate::*;
@@ -35,10 +35,9 @@ impl MysqlCore {
async fn get_client(&self) -> Result<&MySqlPool> {
self.pool
.get_or_try_init(|| async {
- let pool = MySqlPool::connect_with(self.config.clone())
+ MySqlPool::connect_with(self.config.clone())
.await
- .map_err(parse_mysql_error)?;
- Ok(pool)
+ .map_err(parse_mysql_error)
})
.await
}
diff --git a/core/src/services/onedrive/builder.rs
b/core/src/services/onedrive/builder.rs
index 659fa376d..a058d27f1 100644
--- a/core/src/services/onedrive/builder.rs
+++ b/core/src/services/onedrive/builder.rs
@@ -19,9 +19,9 @@ use std::fmt::Debug;
use std::sync::Arc;
use log::debug;
+use mea::mutex::Mutex;
use services::onedrive::core::OneDriveCore;
use services::onedrive::core::OneDriveSigner;
-use tokio::sync::Mutex;
use super::ONEDRIVE_SCHEME;
use super::backend::OnedriveBackend;
diff --git a/core/src/services/onedrive/core.rs
b/core/src/services/onedrive/core.rs
index 4ebd93014..1867fbb65 100644
--- a/core/src/services/onedrive/core.rs
+++ b/core/src/services/onedrive/core.rs
@@ -25,7 +25,7 @@ use http::Request;
use http::Response;
use http::StatusCode;
use http::header;
-use tokio::sync::Mutex;
+use mea::mutex::Mutex;
use super::error::parse_error;
use super::graph_model::*;
diff --git a/core/src/services/postgresql/backend.rs
b/core/src/services/postgresql/backend.rs
index 5f7900748..338f70b6d 100644
--- a/core/src/services/postgresql/backend.rs
+++ b/core/src/services/postgresql/backend.rs
@@ -17,8 +17,8 @@
use std::sync::Arc;
+use mea::once::OnceCell;
use sqlx::postgres::PgConnectOptions;
-use tokio::sync::OnceCell;
use super::POSTGRESQL_SCHEME;
use super::config::PostgresqlConfig;
diff --git a/core/src/services/postgresql/core.rs
b/core/src/services/postgresql/core.rs
index b18df9fb7..309fac33a 100644
--- a/core/src/services/postgresql/core.rs
+++ b/core/src/services/postgresql/core.rs
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use mea::once::OnceCell;
use sqlx::PgPool;
use sqlx::postgres::PgConnectOptions;
-use tokio::sync::OnceCell;
use crate::*;
@@ -35,10 +35,9 @@ impl PostgresqlCore {
async fn get_client(&self) -> Result<&PgPool> {
self.pool
.get_or_try_init(|| async {
- let pool = PgPool::connect_with(self.config.clone())
+ PgPool::connect_with(self.config.clone())
.await
- .map_err(parse_postgres_error)?;
- Ok(pool)
+ .map_err(parse_postgres_error)
})
.await
}
diff --git a/core/src/services/redis/backend.rs
b/core/src/services/redis/backend.rs
index 3c4b7619c..5365422cc 100644
--- a/core/src/services/redis/backend.rs
+++ b/core/src/services/redis/backend.rs
@@ -20,13 +20,13 @@ use std::sync::Arc;
use std::time::Duration;
use http::Uri;
+use mea::once::OnceCell;
use redis::Client;
use redis::ConnectionAddr;
use redis::ConnectionInfo;
use redis::ProtocolVersion;
use redis::RedisConnectionInfo;
use redis::cluster::ClusterClientBuilder;
-use tokio::sync::OnceCell;
use super::REDIS_SCHEME;
use super::config::RedisConfig;
diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs
index 50afc909d..21f609f51 100644
--- a/core/src/services/redis/core.rs
+++ b/core/src/services/redis/core.rs
@@ -20,6 +20,7 @@ use std::time::Duration;
use bb8::RunError;
use bytes::Bytes;
+use mea::once::OnceCell;
use redis::AsyncCommands;
use redis::Client;
use redis::Cmd;
@@ -31,7 +32,6 @@ use redis::aio::ConnectionLike;
use redis::aio::ConnectionManager;
use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
-use tokio::sync::OnceCell;
use crate::*;
diff --git a/core/src/services/seafile/backend.rs
b/core/src/services/seafile/backend.rs
index 43114a8e8..d0d11de3f 100644
--- a/core/src/services/seafile/backend.rs
+++ b/core/src/services/seafile/backend.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use http::Response;
use http::StatusCode;
use log::debug;
-use tokio::sync::RwLock;
+use mea::rwlock::RwLock;
use super::SEAFILE_SCHEME;
use super::config::SeafileConfig;
diff --git a/core/src/services/seafile/core.rs
b/core/src/services/seafile/core.rs
index 3a3c60e62..e26a1cf70 100644
--- a/core/src/services/seafile/core.rs
+++ b/core/src/services/seafile/core.rs
@@ -24,8 +24,8 @@ use http::Request;
use http::Response;
use http::StatusCode;
use http::header;
+use mea::rwlock::RwLock;
use serde::Deserialize;
-use tokio::sync::RwLock;
use super::error::parse_error;
use crate::raw::*;
diff --git a/core/src/services/sftp/backend.rs
b/core/src/services/sftp/backend.rs
index b5758c470..ca2e17a81 100644
--- a/core/src/services/sftp/backend.rs
+++ b/core/src/services/sftp/backend.rs
@@ -21,9 +21,9 @@ use std::path::PathBuf;
use std::sync::Arc;
use log::debug;
+use mea::once::OnceCell;
use openssh::KnownHosts;
use tokio::io::AsyncSeekExt;
-use tokio::sync::OnceCell;
use super::SFTP_SCHEME;
use super::config::SftpConfig;
diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs
index ade9f200c..327cf57af 100644
--- a/core/src/services/sftp/core.rs
+++ b/core/src/services/sftp/core.rs
@@ -23,11 +23,11 @@ use std::sync::Arc;
use bb8::PooledConnection;
use bb8::RunError;
use log::debug;
+use mea::once::OnceCell;
use openssh::KnownHosts;
use openssh::SessionBuilder;
use openssh_sftp_client::Sftp;
use openssh_sftp_client::SftpOptions;
-use tokio::sync::OnceCell;
use super::error::is_sftp_protocol_error;
use super::error::parse_sftp_error;
diff --git a/core/src/services/sqlite/backend.rs
b/core/src/services/sqlite/backend.rs
index 5ed83264f..8b05e765b 100644
--- a/core/src/services/sqlite/backend.rs
+++ b/core/src/services/sqlite/backend.rs
@@ -18,8 +18,8 @@
use std::str::FromStr;
use std::sync::Arc;
+use mea::once::OnceCell;
use sqlx::sqlite::SqliteConnectOptions;
-use tokio::sync::OnceCell;
use super::SQLITE_SCHEME;
use super::config::SqliteConfig;
@@ -319,7 +319,7 @@ mod test {
async fn build_client() -> OnceCell<SqlitePool> {
let config =
SqliteConnectOptions::from_str("sqlite::memory:").unwrap();
let pool = SqlitePool::connect_with(config).await.unwrap();
- OnceCell::new_with(Some(pool))
+ OnceCell::from_value(pool)
}
#[tokio::test]
diff --git a/core/src/services/sqlite/core.rs b/core/src/services/sqlite/core.rs
index 15d4ae2c4..fa55643e9 100644
--- a/core/src/services/sqlite/core.rs
+++ b/core/src/services/sqlite/core.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use mea::once::OnceCell;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
use std::fmt::Debug;
-use tokio::sync::OnceCell;
use crate::services::sqlite::backend::parse_sqlite_error;
use crate::*;
@@ -37,10 +37,9 @@ impl SqliteCore {
pub async fn get_client(&self) -> Result<&SqlitePool> {
self.pool
.get_or_try_init(|| async {
- let pool = SqlitePool::connect_with(self.config.clone())
+ SqlitePool::connect_with(self.config.clone())
.await
- .map_err(parse_sqlite_error)?;
- Ok(pool)
+ .map_err(parse_sqlite_error)
})
.await
}
diff --git a/core/src/services/surrealdb/backend.rs
b/core/src/services/surrealdb/backend.rs
index d7a8b30ad..15f0a1595 100644
--- a/core/src/services/surrealdb/backend.rs
+++ b/core/src/services/surrealdb/backend.rs
@@ -18,7 +18,7 @@
use std::fmt::Debug;
use std::sync::Arc;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::SURREALDB_SCHEME;
use super::config::SurrealdbConfig;
diff --git a/core/src/services/surrealdb/core.rs
b/core/src/services/surrealdb/core.rs
index 5e7755b23..2b543e711 100644
--- a/core/src/services/surrealdb/core.rs
+++ b/core/src/services/surrealdb/core.rs
@@ -18,10 +18,10 @@
use std::fmt::Debug;
use std::sync::Arc;
+use mea::once::OnceCell;
use surrealdb::Surreal;
use surrealdb::engine::any::Any;
use surrealdb::opt::auth::Database;
-use tokio::sync::OnceCell;
use crate::*;
diff --git a/core/src/services/tikv/backend.rs
b/core/src/services/tikv/backend.rs
index 681e357f8..ca5c77b26 100644
--- a/core/src/services/tikv/backend.rs
+++ b/core/src/services/tikv/backend.rs
@@ -18,7 +18,7 @@
use std::fmt::Debug;
use std::sync::Arc;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::TIKV_SCHEME;
use super::config::TikvConfig;
diff --git a/core/src/services/tikv/core.rs b/core/src/services/tikv/core.rs
index 55ca8539c..363bf665e 100644
--- a/core/src/services/tikv/core.rs
+++ b/core/src/services/tikv/core.rs
@@ -17,9 +17,9 @@
use std::fmt::Debug;
+use mea::once::OnceCell;
use tikv_client::Config;
use tikv_client::RawClient;
-use tokio::sync::OnceCell;
use super::TIKV_SCHEME;
use crate::*;
@@ -45,33 +45,32 @@ impl Debug for TikvCore {
}
impl TikvCore {
- async fn get_connection(&self) -> Result<RawClient> {
- if let Some(client) = self.client.get() {
- return Ok(client.clone());
- }
- let client = if self.insecure {
- RawClient::new(self.endpoints.clone())
- .await
- .map_err(parse_tikv_config_error)?
- } else if self.ca_path.is_some() && self.key_path.is_some() &&
self.cert_path.is_some() {
- let (ca_path, key_path, cert_path) = (
- self.ca_path.clone().unwrap(),
- self.key_path.clone().unwrap(),
- self.cert_path.clone().unwrap(),
- );
- let config = Config::default().with_security(ca_path, cert_path,
key_path);
- RawClient::new_with_config(self.endpoints.clone(), config)
- .await
- .map_err(parse_tikv_config_error)?
- } else {
- return Err(
- Error::new(ErrorKind::ConfigInvalid, "invalid configuration")
- .with_context("service", TIKV_SCHEME)
- .with_context("endpoints", format!("{:?}",
self.endpoints)),
- );
- };
- self.client.set(client.clone()).ok();
- Ok(client)
+ async fn get_connection(&self) -> Result<&RawClient> {
+ self.client
+ .get_or_try_init(|| async {
+ if self.insecure {
+ return RawClient::new(self.endpoints.clone())
+ .await
+ .map_err(parse_tikv_config_error);
+ }
+
+ if let Some(ca_path) = self.ca_path.as_ref()
+ && let Some(key_path) = self.key_path.as_ref()
+ && let Some(cert_path) = self.cert_path.as_ref()
+ {
+ let config = Config::default().with_security(ca_path,
cert_path, key_path);
+ return RawClient::new_with_config(self.endpoints.clone(),
config)
+ .await
+ .map_err(parse_tikv_config_error);
+ }
+
+ Err(
+ Error::new(ErrorKind::ConfigInvalid, "invalid
configuration")
+ .with_context("service", TIKV_SCHEME)
+ .with_context("endpoints", format!("{:?}",
self.endpoints)),
+ )
+ })
+ .await
}
pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
diff --git a/core/src/services/webhdfs/backend.rs
b/core/src/services/webhdfs/backend.rs
index 8913d0666..b0043cd6f 100644
--- a/core/src/services/webhdfs/backend.rs
+++ b/core/src/services/webhdfs/backend.rs
@@ -22,7 +22,7 @@ use bytes::Buf;
use http::Response;
use http::StatusCode;
use log::debug;
-use tokio::sync::OnceCell;
+use mea::once::OnceCell;
use super::WEBHDFS_SCHEME;
use super::config::WebhdfsConfig;
diff --git a/core/src/services/webhdfs/core.rs
b/core/src/services/webhdfs/core.rs
index a37725bee..4fef4886b 100644
--- a/core/src/services/webhdfs/core.rs
+++ b/core/src/services/webhdfs/core.rs
@@ -24,8 +24,8 @@ use http::Response;
use http::StatusCode;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
+use mea::once::OnceCell;
use serde::Deserialize;
-use tokio::sync::OnceCell;
use super::error::parse_error;
use crate::raw::*;
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index b16799f38..cc6c7720c 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -210,13 +210,13 @@ mod tests {
use bytes::BufMut;
use bytes::Bytes;
use log::debug;
+ use mea::mutex::Mutex;
use pretty_assertions::assert_eq;
use rand::Rng;
use rand::RngCore;
use rand::thread_rng;
use sha2::Digest;
use sha2::Sha256;
- use tokio::sync::Mutex;
use super::*;
use crate::raw::oio::Write;
diff --git a/integrations/object_store/Cargo.toml
b/integrations/object_store/Cargo.toml
index 534f17310..00dbf5bcc 100644
--- a/integrations/object_store/Cargo.toml
+++ b/integrations/object_store/Cargo.toml
@@ -41,6 +41,7 @@ async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.42", features = ["std", "clock"] }
futures = "0.3"
+mea = { version = "0.5.0 "}
object_store = "0.12.3"
opendal = { version = "0.55.0", path = "../../core", default-features = false }
pin-project = "1.1"
diff --git a/integrations/object_store/src/service/writer.rs
b/integrations/object_store/src/service/writer.rs
index 5dc9ec809..9b6c07dc6 100644
--- a/integrations/object_store/src/service/writer.rs
+++ b/integrations/object_store/src/service/writer.rs
@@ -23,10 +23,10 @@ use object_store::PutPayload;
use object_store::path::Path as ObjectStorePath;
use object_store::{Attribute, AttributeValue};
+use mea::mutex::Mutex;
use opendal::raw::oio::MultipartPart;
use opendal::raw::*;
use opendal::*;
-use tokio::sync::Mutex;
use super::core::{format_put_multipart_options, format_put_result,
parse_op_write};
use super::error::parse_error;
@@ -160,11 +160,7 @@ impl oio::MultipartWrite for ObjectStoreWriter {
Ok(multipart_part)
}
- async fn complete_part(
- &self,
- _upload_id: &str,
- parts: &[oio::MultipartPart],
- ) -> Result<Metadata> {
+ async fn complete_part(&self, _upload_id: &str, parts: &[MultipartPart])
-> Result<Metadata> {
// Validate that we have parts to complete
if parts.is_empty() {
return Err(Error::new(
diff --git a/integrations/object_store/src/store.rs
b/integrations/object_store/src/store.rs
index 177ce72d5..08a377c18 100644
--- a/integrations/object_store/src/store.rs
+++ b/integrations/object_store/src/store.rs
@@ -27,6 +27,8 @@ use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::stream::BoxStream;
+use mea::mutex::Mutex;
+use mea::oneshot;
use object_store::ListResult;
use object_store::MultipartUpload;
use object_store::ObjectMeta;
@@ -45,7 +47,6 @@ use opendal::options::CopyOptions;
use opendal::raw::percent_decode_path;
use opendal::{Operator, OperatorInfo};
use std::collections::HashMap;
-use tokio::sync::{Mutex, Notify};
/// OpendalStore implements ObjectStore trait by using opendal.
///
@@ -608,15 +609,18 @@ impl ObjectStore for OpendalStore {
struct OpendalMultipartUpload {
writer: Arc<Mutex<Writer>>,
location: Path,
- next_notify: Option<Arc<Notify>>,
+ next_notify: oneshot::Receiver<()>,
}
impl OpendalMultipartUpload {
fn new(writer: Writer, location: Path) -> Self {
+ // an immediately dropped sender for the first part to write without
waiting
+ let (_, rx) = oneshot::channel();
+
Self {
writer: Arc::new(Mutex::new(writer)),
location,
- next_notify: None,
+ next_notify: rx,
}
}
}
@@ -628,16 +632,13 @@ impl MultipartUpload for OpendalMultipartUpload {
let location = self.location.clone();
// Generate next notify which will be notified after the current part
is written.
- let next_notify = Arc::new(Notify::new());
+ let (tx, rx) = oneshot::channel();
// Fetch the notify for current part to wait for it to be written.
- let current_notify = self.next_notify.replace(next_notify.clone());
+ let last_rx = std::mem::replace(&mut self.next_notify, rx);
async move {
- // current_notify == None means that it's the first part, we don't
need to wait.
- if let Some(notify) = current_notify {
- // Wait for the previous part to be written
- notify.notified().await;
- }
+ // Wait for the previous part to be written
+ let _ = last_rx.await;
let mut writer = writer.lock().await;
let result = writer
@@ -646,7 +647,7 @@ impl MultipartUpload for OpendalMultipartUpload {
.map_err(|err| format_object_store_error(err,
location.as_ref()));
// Notify the next part to be written
- next_notify.notify_one();
+ drop(tx);
result
}