This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch bb8-to-fastpool in repository https://gitbox.apache.org/repos/asf/opendal.git
commit cabb348023372712d0315f7a96e202f7eed6a35e Author: tison <[email protected]> AuthorDate: Tue Nov 25 19:21:37 2025 +0800 for redis Signed-off-by: tison <[email protected]> --- core/src/services/redis/backend.rs | 38 ++++++------- core/src/services/redis/config.rs | 2 +- core/src/services/redis/core.rs | 111 +++++++++++++++++++------------------ 3 files changed, 73 insertions(+), 78 deletions(-) diff --git a/core/src/services/redis/backend.rs b/core/src/services/redis/backend.rs index 5365422cc..e9b3d58a1 100644 --- a/core/src/services/redis/backend.rs +++ b/core/src/services/redis/backend.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::time::Duration; use http::Uri; -use mea::once::OnceCell; use redis::Client; use redis::ConnectionAddr; use redis::ConnectionInfo; @@ -133,7 +132,7 @@ impl RedisBuilder { /// /// Will panic if `max_size` is 0. #[must_use] - pub fn connection_pool_max_size(mut self, max_size: u32) -> Self { + pub fn connection_pool_max_size(mut self, max_size: usize) -> Self { assert!(max_size > 0, "max_size must be greater than zero!"); self.config.connection_pool_max_size = Some(max_size); self @@ -166,16 +165,13 @@ impl Builder for RedisBuilder { } let client = client_builder.build().map_err(format_redis_error)?; - let conn = OnceCell::new(); - - Ok(RedisBackend::new(RedisCore { - addr: endpoints, - client: None, - cluster_client: Some(client), - conn, - default_ttl: self.config.default_ttl, - connection_pool_max_size: self.config.connection_pool_max_size, - }) + Ok(RedisBackend::new(RedisCore::new( + endpoints, + None, + Some(client), + self.config.default_ttl, + self.config.connection_pool_max_size, + )) .with_normalized_root(root)) } else { let endpoint = self @@ -193,15 +189,13 @@ impl Builder for RedisBuilder { .set_source(e) })?; - let conn = OnceCell::new(); - Ok(RedisBackend::new(RedisCore { - addr: endpoint, - client: Some(client), - cluster_client: None, - conn, - default_ttl: self.config.default_ttl, - connection_pool_max_size: self.config.connection_pool_max_size, - }) + Ok(RedisBackend::new(RedisCore::new( + endpoint, + Some(client), + None, + self.config.default_ttl, + self.config.connection_pool_max_size, + )) .with_normalized_root(root)) } } @@ -277,7 +271,7 @@ impl RedisBackend { fn new(core: RedisCore) -> Self { let info = AccessorInfo::default(); info.set_scheme(REDIS_SCHEME); - info.set_name(&core.addr); + info.set_name(core.addr()); info.set_root("/"); info.set_native_capability(Capability { read: true, diff --git a/core/src/services/redis/config.rs b/core/src/services/redis/config.rs index c5d134f80..1f07f4a04 100644 --- a/core/src/services/redis/config.rs +++ b/core/src/services/redis/config.rs @@ -40,7 +40,7 @@ pub struct RedisConfig { /// The maximum number of connections allowed. /// /// default is 10 - pub connection_pool_max_size: Option<u32>, + pub connection_pool_max_size: Option<usize>, /// the username to connect redis service. /// /// default is None diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs index 21f609f51..f0f5de20a 100644 --- a/core/src/services/redis/core.rs +++ b/core/src/services/redis/core.rs @@ -16,11 +16,12 @@ // under the License. use std::fmt::Debug; +use std::sync::Arc; use std::time::Duration; -use bb8::RunError; +use crate::*; use bytes::Bytes; -use mea::once::OnceCell; +use fastpool::{ManageObject, ObjectStatus, bounded}; use redis::AsyncCommands; use redis::Client; use redis::Cmd; @@ -33,8 +34,6 @@ use redis::aio::ConnectionManager; use redis::cluster::ClusterClient; use redis::cluster_async::ClusterConnection; -use crate::*; - #[derive(Clone)] pub enum RedisConnection { Normal(ConnectionManager), @@ -71,15 +70,15 @@ impl ConnectionLike for RedisConnection { #[derive(Clone)] pub struct RedisConnectionManager { - pub client: Option<Client>, - pub cluster_client: Option<ClusterClient>, + client: Option<Client>, + cluster_client: Option<ClusterClient>, } -impl bb8::ManageConnection for RedisConnectionManager { - type Connection = RedisConnection; +impl ManageObject for RedisConnectionManager { + type Object = RedisConnection; type Error = Error; - async fn connect(&self) -> Result<RedisConnection, Self::Error> { + async fn create(&self) -> Result<RedisConnection, Self::Error> { if let Some(client) = self.client.clone() { ConnectionManager::new(client.clone()) .await @@ -96,30 +95,27 @@ impl bb8::ManageConnection for RedisConnectionManager { } } - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - let pong: String = conn.ping().await.map_err(format_redis_error)?; - - if pong == "PONG" { - Ok(()) - } else { - Err(Error::new(ErrorKind::Unexpected, "PING ERROR")) + async fn is_recyclable( + &self, + o: &mut Self::Object, + _: &ObjectStatus, + ) -> Result<(), Self::Error> { + match o.ping::<String>().await { + Ok(ref pong) => match pong.as_bytes() { + b"PONG" => Ok(()), + _ => Err(Error::new(ErrorKind::Unexpected, "PING ERROR")), + }, + Err(err) => Err(format_redis_error(err)), } } - - fn has_broken(&self, _: &mut Self::Connection) -> bool { - false - } } /// RedisCore holds the Redis connection and configuration #[derive(Clone)] pub struct RedisCore { - pub addr: String, - pub client: Option<Client>, - pub cluster_client: Option<ClusterClient>, - pub conn: OnceCell<bb8::Pool<RedisConnectionManager>>, - pub default_ttl: Option<Duration>, - pub connection_pool_max_size: Option<u32>, + addr: String, + conn: Arc<bounded::Pool<RedisConnectionManager>>, + default_ttl: Option<Duration>, } impl Debug for RedisCore { @@ -131,38 +127,43 @@ impl Debug for RedisCore { } impl RedisCore { - pub async fn conn(&self) -> Result<bb8::PooledConnection<'_, RedisConnectionManager>> { - let pool = self - .conn - .get_or_try_init(|| async { - bb8::Pool::builder() - .max_size(self.connection_pool_max_size.unwrap_or(10)) - .build(self.get_redis_connection_manager()) - .await - .map_err(|err| { - Error::new(ErrorKind::ConfigInvalid, "connect to redis failed") - .set_source(err) - }) - }) - .await?; - pool.get().await.map_err(|err| match err { - RunError::TimedOut => { - Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary() - } - RunError::User(err) => err, - }) + pub fn new( + endpoint: String, + client: Option<Client>, + cluster_client: Option<ClusterClient>, + default_ttl: Option<Duration>, + connection_pool_max_size: Option<usize>, + ) -> Self { + let manager = RedisConnectionManager { + client, + cluster_client, + }; + let pool = bounded::Pool::new( + bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)), + manager, + ); + + Self { + addr: endpoint, + conn: pool, + default_ttl, + } + } + + pub fn addr(&self) -> &str { + &self.addr } - pub fn get_redis_connection_manager(&self) -> RedisConnectionManager { - if let Some(_client) = self.client.clone() { - RedisConnectionManager { - client: self.client.clone(), - cluster_client: None, + pub async fn conn(&self) -> Result<bounded::Object<RedisConnectionManager>> { + let fut = self.conn.get(); + + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => { + Err(Error::new(ErrorKind::Unexpected, "connection request: timeout").set_temporary()) } - } else { - RedisConnectionManager { - client: None, - cluster_client: self.cluster_client.clone(), + result = fut => match result { + Ok(conn) => Ok(conn), + Err(err) => Err(err), } } }
