This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch use-mea-sync in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 617dbbc1916ad167e88476b84a8c15008059a250 Author: tison <[email protected]> AuthorDate: Tue Nov 25 14:44:16 2025 +0800 for oncecell Signed-off-by: tison <[email protected]> --- core/src/services/etcd/backend.rs | 2 +- core/src/services/etcd/core.rs | 7 +++-- core/src/services/ftp/backend.rs | 2 +- core/src/services/ftp/core.rs | 4 +-- core/src/services/gridfs/backend.rs | 2 +- core/src/services/gridfs/core.rs | 2 +- core/src/services/koofr/backend.rs | 2 +- core/src/services/koofr/core.rs | 2 +- core/src/services/memcached/backend.rs | 2 +- core/src/services/memcached/core.rs | 4 +-- core/src/services/mongodb/backend.rs | 2 +- core/src/services/mongodb/core.rs | 2 +- core/src/services/mysql/backend.rs | 2 +- core/src/services/mysql/core.rs | 7 ++--- core/src/services/postgresql/backend.rs | 2 +- core/src/services/postgresql/core.rs | 7 ++--- core/src/services/redis/backend.rs | 2 +- core/src/services/redis/core.rs | 2 +- core/src/services/sftp/backend.rs | 2 +- core/src/services/sftp/core.rs | 2 +- core/src/services/sqlite/backend.rs | 2 +- core/src/services/sqlite/core.rs | 7 ++--- core/src/services/surrealdb/backend.rs | 2 +- core/src/services/surrealdb/core.rs | 2 +- core/src/services/tikv/backend.rs | 2 +- core/src/services/tikv/core.rs | 55 ++++++++++++++++----------------- core/src/services/webhdfs/backend.rs | 2 +- core/src/services/webhdfs/core.rs | 2 +- 28 files changed, 65 insertions(+), 68 deletions(-) diff --git a/core/src/services/etcd/backend.rs b/core/src/services/etcd/backend.rs index 77002ca33..8a9d916f0 100644 --- a/core/src/services/etcd/backend.rs +++ b/core/src/services/etcd/backend.rs @@ -21,7 +21,7 @@ use etcd_client::Certificate; use etcd_client::ConnectOptions; use etcd_client::Identity; use etcd_client::TlsOptions; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::ETCD_SCHEME; use super::config::EtcdConfig; diff --git a/core/src/services/etcd/core.rs b/core/src/services/etcd/core.rs index e21fa9e8e..fcb07a73d 100644 --- a/core/src/services/etcd/core.rs +++ b/core/src/services/etcd/core.rs @@ -17,11 +17,12 @@ use std::fmt::Debug; +use bb8::Pool; use bb8::PooledConnection; use bb8::RunError; use etcd_client::Client; use etcd_client::ConnectOptions; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use crate::services::etcd::error::format_etcd_error; use crate::{Buffer, Error, ErrorKind, Result}; @@ -63,7 +64,7 @@ impl bb8::ManageConnection for Manager { pub struct EtcdCore { pub endpoints: Vec<String>, pub options: ConnectOptions, - pub client: OnceCell<bb8::Pool<Manager>>, + pub client: OnceCell<Pool<Manager>>, } impl Debug for EtcdCore { @@ -80,7 +81,7 @@ impl EtcdCore { let client = self .client .get_or_try_init(|| async { - bb8::Pool::builder() + Pool::builder() .max_size(64) .build(Manager { endpoints: self.endpoints.clone(), diff --git a/core/src/services/ftp/backend.rs b/core/src/services/ftp/backend.rs index af7700ec4..0b0c87b13 100644 --- a/core/src/services/ftp/backend.rs +++ b/core/src/services/ftp/backend.rs @@ -21,12 +21,12 @@ use std::sync::Arc; use http::Uri; use log::debug; +use mea::once::OnceCell; use services::ftp::core::Manager; use suppaftp::FtpError; use suppaftp::Status; use suppaftp::list::File; use suppaftp::types::Response; -use tokio::sync::OnceCell; use super::FTP_SCHEME; use super::config::FtpConfig; diff --git a/core/src/services/ftp/core.rs b/core/src/services/ftp/core.rs index 8232aa107..42d427096 100644 --- a/core/src/services/ftp/core.rs +++ b/core/src/services/ftp/core.rs @@ -21,6 +21,7 @@ use bb8::Pool; use bb8::PooledConnection; use bb8::RunError; use futures_rustls::TlsConnector; +use mea::once::OnceCell; use raw::Operation; use suppaftp::AsyncRustlsConnector; use suppaftp::AsyncRustlsFtpStream; @@ -29,7 +30,6 @@ use suppaftp::ImplAsyncFtpStream; use suppaftp::Status; use suppaftp::rustls::ClientConfig; use suppaftp::types::FileType; -use tokio::sync::OnceCell; use super::err::parse_error; use crate::raw::AccessorInfo; @@ -46,7 +46,7 @@ impl FtpCore { let pool = self .pool .get_or_try_init(|| async { - bb8::Pool::builder() + Pool::builder() .max_size(64) .build(self.manager.clone()) .await diff --git a/core/src/services/gridfs/backend.rs b/core/src/services/gridfs/backend.rs index 71e3ae875..7519cacb8 100644 --- a/core/src/services/gridfs/backend.rs +++ b/core/src/services/gridfs/backend.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::GRIDFS_SCHEME; use super::config::GridfsConfig; diff --git a/core/src/services/gridfs/core.rs b/core/src/services/gridfs/core.rs index 2c06e3cc6..cdc2a8ac0 100644 --- a/core/src/services/gridfs/core.rs +++ b/core/src/services/gridfs/core.rs @@ -19,11 +19,11 @@ use std::fmt::Debug; use futures::AsyncReadExt; use futures::AsyncWriteExt; +use mea::once::OnceCell; use mongodb::bson::doc; use mongodb::gridfs::GridFsBucket; use mongodb::options::ClientOptions; use mongodb::options::GridFsBucketOptions; -use tokio::sync::OnceCell; use crate::raw::*; use crate::*; diff --git a/core/src/services/koofr/backend.rs b/core/src/services/koofr/backend.rs index e129deded..6a2481109 100644 --- a/core/src/services/koofr/backend.rs +++ b/core/src/services/koofr/backend.rs @@ -23,7 +23,7 @@ use http::Response; use http::StatusCode; use log::debug; use mea::mutex::Mutex; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::KOOFR_SCHEME; use super::config::KoofrConfig; diff --git a/core/src/services/koofr/core.rs b/core/src/services/koofr/core.rs index a12926947..83c42f163 100644 --- a/core/src/services/koofr/core.rs +++ b/core/src/services/koofr/core.rs @@ -27,9 +27,9 @@ use http::StatusCode; use http::header; use http::request; use mea::mutex::Mutex; +use mea::once::OnceCell; use serde::Deserialize; use serde_json::json; -use tokio::sync::OnceCell; use super::error::parse_error; use crate::raw::*; diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index 86d4b1877..22ce701c6 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::time::Duration; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::MEMCACHED_SCHEME; use super::config::MemcachedConfig; diff --git a/core/src/services/memcached/core.rs b/core/src/services/memcached/core.rs index c95b62ea4..add582f80 100644 --- a/core/src/services/memcached/core.rs +++ b/core/src/services/memcached/core.rs @@ -18,8 +18,8 @@ use std::time::Duration; use bb8::RunError; +use mea::once::OnceCell; use tokio::net::TcpStream; -use tokio::sync::OnceCell; use super::binary; use crate::raw::*; @@ -95,7 +95,7 @@ impl MemcachedCore { .build(mgr) .await .map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "connect to memecached failed") + Error::new(ErrorKind::ConfigInvalid, "connect to memcached failed") .set_source(err) }) }) diff --git a/core/src/services/mongodb/backend.rs b/core/src/services/mongodb/backend.rs index 2068c9a2e..17513fd9f 100644 --- a/core/src/services/mongodb/backend.rs +++ b/core/src/services/mongodb/backend.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::MONGODB_SCHEME; use super::config::MongodbConfig; diff --git a/core/src/services/mongodb/core.rs b/core/src/services/mongodb/core.rs index 820242359..9e8a6ee82 100644 --- a/core/src/services/mongodb/core.rs +++ b/core/src/services/mongodb/core.rs @@ -17,11 +17,11 @@ use std::fmt::Debug; +use mea::once::OnceCell; use mongodb::bson::Binary; use mongodb::bson::Document; use mongodb::bson::doc; use mongodb::options::ClientOptions; -use tokio::sync::OnceCell; use crate::*; diff --git a/core/src/services/mysql/backend.rs b/core/src/services/mysql/backend.rs index bede75b6e..cb6ee4678 100644 --- a/core/src/services/mysql/backend.rs +++ b/core/src/services/mysql/backend.rs @@ -18,8 +18,8 @@ use std::fmt::Debug; use std::sync::Arc; +use mea::once::OnceCell; use sqlx::mysql::MySqlConnectOptions; -use tokio::sync::OnceCell; use super::MYSQL_SCHEME; use super::config::MysqlConfig; diff --git a/core/src/services/mysql/core.rs b/core/src/services/mysql/core.rs index c491106eb..3a346e9c5 100644 --- a/core/src/services/mysql/core.rs +++ b/core/src/services/mysql/core.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use mea::once::OnceCell; use sqlx::MySqlPool; use sqlx::mysql::MySqlConnectOptions; -use tokio::sync::OnceCell; use crate::*; @@ -35,10 +35,9 @@ impl MysqlCore { async fn get_client(&self) -> Result<&MySqlPool> { self.pool .get_or_try_init(|| async { - let pool = MySqlPool::connect_with(self.config.clone()) + MySqlPool::connect_with(self.config.clone()) .await - .map_err(parse_mysql_error)?; - Ok(pool) + .map_err(parse_mysql_error) }) .await } diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index 5f7900748..338f70b6d 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -17,8 +17,8 @@ use std::sync::Arc; +use mea::once::OnceCell; use sqlx::postgres::PgConnectOptions; -use tokio::sync::OnceCell; use super::POSTGRESQL_SCHEME; use super::config::PostgresqlConfig; diff --git a/core/src/services/postgresql/core.rs b/core/src/services/postgresql/core.rs index b18df9fb7..309fac33a 100644 --- a/core/src/services/postgresql/core.rs +++ b/core/src/services/postgresql/core.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use mea::once::OnceCell; use sqlx::PgPool; use sqlx::postgres::PgConnectOptions; -use tokio::sync::OnceCell; use crate::*; @@ -35,10 +35,9 @@ impl PostgresqlCore { async fn get_client(&self) -> Result<&PgPool> { self.pool .get_or_try_init(|| async { - let pool = PgPool::connect_with(self.config.clone()) + PgPool::connect_with(self.config.clone()) .await - .map_err(parse_postgres_error)?; - Ok(pool) + .map_err(parse_postgres_error) }) .await } diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index 3c4b7619c..5365422cc 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -20,13 +20,13 @@ use std::sync::Arc; use std::time::Duration; use http::Uri; +use mea::once::OnceCell; use redis::Client; use redis::ConnectionAddr; use redis::ConnectionInfo; use redis::ProtocolVersion; use redis::RedisConnectionInfo; use redis::cluster::ClusterClientBuilder; -use tokio::sync::OnceCell; use super::REDIS_SCHEME; use super::config::RedisConfig; diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs index 50afc909d..21f609f51 100644 --- a/core/src/services/redis/core.rs +++ b/core/src/services/redis/core.rs @@ -20,6 +20,7 @@ use std::time::Duration; use bb8::RunError; use bytes::Bytes; +use mea::once::OnceCell; use redis::AsyncCommands; use redis::Client; use redis::Cmd; @@ -31,7 +32,6 @@ use redis::aio::ConnectionLike; use redis::aio::ConnectionManager; use redis::cluster::ClusterClient; use redis::cluster_async::ClusterConnection; -use tokio::sync::OnceCell; use crate::*; diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index b5758c470..ca2e17a81 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -21,9 +21,9 @@ use std::path::PathBuf; use std::sync::Arc; use log::debug; +use mea::once::OnceCell; use openssh::KnownHosts; use tokio::io::AsyncSeekExt; -use tokio::sync::OnceCell; use super::SFTP_SCHEME; use super::config::SftpConfig; diff --git a/core/src/services/sftp/core.rs b/core/src/services/sftp/core.rs index ade9f200c..327cf57af 100644 --- a/core/src/services/sftp/core.rs +++ b/core/src/services/sftp/core.rs @@ -23,11 +23,11 @@ use std::sync::Arc; use bb8::PooledConnection; use bb8::RunError; use log::debug; +use mea::once::OnceCell; use openssh::KnownHosts; use openssh::SessionBuilder; use openssh_sftp_client::Sftp; use openssh_sftp_client::SftpOptions; -use tokio::sync::OnceCell; use super::error::is_sftp_protocol_error; use super::error::parse_sftp_error; diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 5ed83264f..9e2ce9545 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -18,8 +18,8 @@ use std::str::FromStr; use std::sync::Arc; +use mea::once::OnceCell; use sqlx::sqlite::SqliteConnectOptions; -use tokio::sync::OnceCell; use super::SQLITE_SCHEME; use super::config::SqliteConfig; diff --git a/core/src/services/sqlite/core.rs b/core/src/services/sqlite/core.rs index 15d4ae2c4..fa55643e9 100644 --- a/core/src/services/sqlite/core.rs +++ b/core/src/services/sqlite/core.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +use mea::once::OnceCell; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use std::fmt::Debug; -use tokio::sync::OnceCell; use crate::services::sqlite::backend::parse_sqlite_error; use crate::*; @@ -37,10 +37,9 @@ impl SqliteCore { pub async fn get_client(&self) -> Result<&SqlitePool> { self.pool .get_or_try_init(|| async { - let pool = SqlitePool::connect_with(self.config.clone()) + SqlitePool::connect_with(self.config.clone()) .await - .map_err(parse_sqlite_error)?; - Ok(pool) + .map_err(parse_sqlite_error) }) .await } diff --git a/core/src/services/surrealdb/backend.rs b/core/src/services/surrealdb/backend.rs index d7a8b30ad..15f0a1595 100644 --- a/core/src/services/surrealdb/backend.rs +++ b/core/src/services/surrealdb/backend.rs @@ -18,7 +18,7 @@ use std::fmt::Debug; use std::sync::Arc; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::SURREALDB_SCHEME; use super::config::SurrealdbConfig; diff --git a/core/src/services/surrealdb/core.rs b/core/src/services/surrealdb/core.rs index 5e7755b23..2b543e711 100644 --- a/core/src/services/surrealdb/core.rs +++ b/core/src/services/surrealdb/core.rs @@ -18,10 +18,10 @@ use std::fmt::Debug; use std::sync::Arc; +use mea::once::OnceCell; use surrealdb::Surreal; use surrealdb::engine::any::Any; use surrealdb::opt::auth::Database; -use tokio::sync::OnceCell; use crate::*; diff --git a/core/src/services/tikv/backend.rs b/core/src/services/tikv/backend.rs index 681e357f8..ca5c77b26 100644 --- a/core/src/services/tikv/backend.rs +++ b/core/src/services/tikv/backend.rs @@ -18,7 +18,7 @@ use std::fmt::Debug; use std::sync::Arc; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::TIKV_SCHEME; use super::config::TikvConfig; diff --git a/core/src/services/tikv/core.rs b/core/src/services/tikv/core.rs index 55ca8539c..363bf665e 100644 --- a/core/src/services/tikv/core.rs +++ b/core/src/services/tikv/core.rs @@ -17,9 +17,9 @@ use std::fmt::Debug; +use mea::once::OnceCell; use tikv_client::Config; use tikv_client::RawClient; -use tokio::sync::OnceCell; use super::TIKV_SCHEME; use crate::*; @@ -45,33 +45,32 @@ impl Debug for TikvCore { } impl TikvCore { - async fn get_connection(&self) -> Result<RawClient> { - if let Some(client) = self.client.get() { - return Ok(client.clone()); - } - let client = if self.insecure { - RawClient::new(self.endpoints.clone()) - .await - .map_err(parse_tikv_config_error)? - } else if self.ca_path.is_some() && self.key_path.is_some() && self.cert_path.is_some() { - let (ca_path, key_path, cert_path) = ( - self.ca_path.clone().unwrap(), - self.key_path.clone().unwrap(), - self.cert_path.clone().unwrap(), - ); - let config = Config::default().with_security(ca_path, cert_path, key_path); - RawClient::new_with_config(self.endpoints.clone(), config) - .await - .map_err(parse_tikv_config_error)? - } else { - return Err( - Error::new(ErrorKind::ConfigInvalid, "invalid configuration") - .with_context("service", TIKV_SCHEME) - .with_context("endpoints", format!("{:?}", self.endpoints)), - ); - }; - self.client.set(client.clone()).ok(); - Ok(client) + async fn get_connection(&self) -> Result<&RawClient> { + self.client + .get_or_try_init(|| async { + if self.insecure { + return RawClient::new(self.endpoints.clone()) + .await + .map_err(parse_tikv_config_error); + } + + if let Some(ca_path) = self.ca_path.as_ref() + && let Some(key_path) = self.key_path.as_ref() + && let Some(cert_path) = self.cert_path.as_ref() + { + let config = Config::default().with_security(ca_path, cert_path, key_path); + return RawClient::new_with_config(self.endpoints.clone(), config) + .await + .map_err(parse_tikv_config_error); + } + + Err( + Error::new(ErrorKind::ConfigInvalid, "invalid configuration") + .with_context("service", TIKV_SCHEME) + .with_context("endpoints", format!("{:?}", self.endpoints)), + ) + }) + .await } pub async fn get(&self, path: &str) -> Result<Option<Buffer>> { diff --git a/core/src/services/webhdfs/backend.rs b/core/src/services/webhdfs/backend.rs index 8913d0666..b0043cd6f 100644 --- a/core/src/services/webhdfs/backend.rs +++ b/core/src/services/webhdfs/backend.rs @@ -22,7 +22,7 @@ use bytes::Buf; use http::Response; use http::StatusCode; use log::debug; -use tokio::sync::OnceCell; +use mea::once::OnceCell; use super::WEBHDFS_SCHEME; use super::config::WebhdfsConfig; diff --git a/core/src/services/webhdfs/core.rs b/core/src/services/webhdfs/core.rs index a37725bee..4fef4886b 100644 --- a/core/src/services/webhdfs/core.rs +++ b/core/src/services/webhdfs/core.rs @@ -24,8 +24,8 @@ use http::Response; use http::StatusCode; use http::header::CONTENT_LENGTH; use http::header::CONTENT_TYPE; +use mea::once::OnceCell; use serde::Deserialize; -use tokio::sync::OnceCell; use super::error::parse_error; use crate::raw::*;
