This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch bb8-to-fastpool
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit cabb348023372712d0315f7a96e202f7eed6a35e
Author: tison <[email protected]>
AuthorDate: Tue Nov 25 19:21:37 2025 +0800

    for redis
    
    Signed-off-by: tison <[email protected]>
---
 core/src/services/redis/backend.rs |  38 ++++++-------
 core/src/services/redis/config.rs  |   2 +-
 core/src/services/redis/core.rs    | 111 +++++++++++++++++++------------------
 3 files changed, 73 insertions(+), 78 deletions(-)

diff --git a/core/src/services/redis/backend.rs 
b/core/src/services/redis/backend.rs
index 5365422cc..e9b3d58a1 100644
--- a/core/src/services/redis/backend.rs
+++ b/core/src/services/redis/backend.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 use std::time::Duration;
 
 use http::Uri;
-use mea::once::OnceCell;
 use redis::Client;
 use redis::ConnectionAddr;
 use redis::ConnectionInfo;
@@ -133,7 +132,7 @@ impl RedisBuilder {
     ///
     /// Will panic if `max_size` is 0.
     #[must_use]
-    pub fn connection_pool_max_size(mut self, max_size: u32) -> Self {
+    pub fn connection_pool_max_size(mut self, max_size: usize) -> Self {
         assert!(max_size > 0, "max_size must be greater than zero!");
         self.config.connection_pool_max_size = Some(max_size);
         self
@@ -166,16 +165,13 @@ impl Builder for RedisBuilder {
             }
             let client = client_builder.build().map_err(format_redis_error)?;
 
-            let conn = OnceCell::new();
-
-            Ok(RedisBackend::new(RedisCore {
-                addr: endpoints,
-                client: None,
-                cluster_client: Some(client),
-                conn,
-                default_ttl: self.config.default_ttl,
-                connection_pool_max_size: self.config.connection_pool_max_size,
-            })
+            Ok(RedisBackend::new(RedisCore::new(
+                endpoints,
+                None,
+                Some(client),
+                self.config.default_ttl,
+                self.config.connection_pool_max_size,
+            ))
             .with_normalized_root(root))
         } else {
             let endpoint = self
@@ -193,15 +189,13 @@ impl Builder for RedisBuilder {
                         .set_source(e)
                 })?;
 
-            let conn = OnceCell::new();
-            Ok(RedisBackend::new(RedisCore {
-                addr: endpoint,
-                client: Some(client),
-                cluster_client: None,
-                conn,
-                default_ttl: self.config.default_ttl,
-                connection_pool_max_size: self.config.connection_pool_max_size,
-            })
+            Ok(RedisBackend::new(RedisCore::new(
+                endpoint,
+                Some(client),
+                None,
+                self.config.default_ttl,
+                self.config.connection_pool_max_size,
+            ))
             .with_normalized_root(root))
         }
     }
@@ -277,7 +271,7 @@ impl RedisBackend {
     fn new(core: RedisCore) -> Self {
         let info = AccessorInfo::default();
         info.set_scheme(REDIS_SCHEME);
-        info.set_name(&core.addr);
+        info.set_name(core.addr());
         info.set_root("/");
         info.set_native_capability(Capability {
             read: true,
diff --git a/core/src/services/redis/config.rs 
b/core/src/services/redis/config.rs
index c5d134f80..1f07f4a04 100644
--- a/core/src/services/redis/config.rs
+++ b/core/src/services/redis/config.rs
@@ -40,7 +40,7 @@ pub struct RedisConfig {
     /// The maximum number of connections allowed.
     ///
     /// default is 10
-    pub connection_pool_max_size: Option<u32>,
+    pub connection_pool_max_size: Option<usize>,
     /// the username to connect redis service.
     ///
     /// default is None
diff --git a/core/src/services/redis/core.rs b/core/src/services/redis/core.rs
index 21f609f51..f0f5de20a 100644
--- a/core/src/services/redis/core.rs
+++ b/core/src/services/redis/core.rs
@@ -16,11 +16,12 @@
 // under the License.
 
 use std::fmt::Debug;
+use std::sync::Arc;
 use std::time::Duration;
 
-use bb8::RunError;
+use crate::*;
 use bytes::Bytes;
-use mea::once::OnceCell;
+use fastpool::{ManageObject, ObjectStatus, bounded};
 use redis::AsyncCommands;
 use redis::Client;
 use redis::Cmd;
@@ -33,8 +34,6 @@ use redis::aio::ConnectionManager;
 use redis::cluster::ClusterClient;
 use redis::cluster_async::ClusterConnection;
 
-use crate::*;
-
 #[derive(Clone)]
 pub enum RedisConnection {
     Normal(ConnectionManager),
@@ -71,15 +70,15 @@ impl ConnectionLike for RedisConnection {
 
 #[derive(Clone)]
 pub struct RedisConnectionManager {
-    pub client: Option<Client>,
-    pub cluster_client: Option<ClusterClient>,
+    client: Option<Client>,
+    cluster_client: Option<ClusterClient>,
 }
 
-impl bb8::ManageConnection for RedisConnectionManager {
-    type Connection = RedisConnection;
+impl ManageObject for RedisConnectionManager {
+    type Object = RedisConnection;
     type Error = Error;
 
-    async fn connect(&self) -> Result<RedisConnection, Self::Error> {
+    async fn create(&self) -> Result<RedisConnection, Self::Error> {
         if let Some(client) = self.client.clone() {
             ConnectionManager::new(client.clone())
                 .await
@@ -96,30 +95,27 @@ impl bb8::ManageConnection for RedisConnectionManager {
         }
     }
 
-    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), 
Self::Error> {
-        let pong: String = conn.ping().await.map_err(format_redis_error)?;
-
-        if pong == "PONG" {
-            Ok(())
-        } else {
-            Err(Error::new(ErrorKind::Unexpected, "PING ERROR"))
+    async fn is_recyclable(
+        &self,
+        o: &mut Self::Object,
+        _: &ObjectStatus,
+    ) -> Result<(), Self::Error> {
+        match o.ping::<String>().await {
+            Ok(ref pong) => match pong.as_bytes() {
+                b"PONG" => Ok(()),
+                _ => Err(Error::new(ErrorKind::Unexpected, "PING ERROR")),
+            },
+            Err(err) => Err(format_redis_error(err)),
         }
     }
-
-    fn has_broken(&self, _: &mut Self::Connection) -> bool {
-        false
-    }
 }
 
 /// RedisCore holds the Redis connection and configuration
 #[derive(Clone)]
 pub struct RedisCore {
-    pub addr: String,
-    pub client: Option<Client>,
-    pub cluster_client: Option<ClusterClient>,
-    pub conn: OnceCell<bb8::Pool<RedisConnectionManager>>,
-    pub default_ttl: Option<Duration>,
-    pub connection_pool_max_size: Option<u32>,
+    addr: String,
+    conn: Arc<bounded::Pool<RedisConnectionManager>>,
+    default_ttl: Option<Duration>,
 }
 
 impl Debug for RedisCore {
@@ -131,38 +127,43 @@ impl Debug for RedisCore {
 }
 
 impl RedisCore {
-    pub async fn conn(&self) -> Result<bb8::PooledConnection<'_, 
RedisConnectionManager>> {
-        let pool = self
-            .conn
-            .get_or_try_init(|| async {
-                bb8::Pool::builder()
-                    .max_size(self.connection_pool_max_size.unwrap_or(10))
-                    .build(self.get_redis_connection_manager())
-                    .await
-                    .map_err(|err| {
-                        Error::new(ErrorKind::ConfigInvalid, "connect to redis 
failed")
-                            .set_source(err)
-                    })
-            })
-            .await?;
-        pool.get().await.map_err(|err| match err {
-            RunError::TimedOut => {
-                Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
-            }
-            RunError::User(err) => err,
-        })
+    pub fn new(
+        endpoint: String,
+        client: Option<Client>,
+        cluster_client: Option<ClusterClient>,
+        default_ttl: Option<Duration>,
+        connection_pool_max_size: Option<usize>,
+    ) -> Self {
+        let manager = RedisConnectionManager {
+            client,
+            cluster_client,
+        };
+        let pool = bounded::Pool::new(
+            bounded::PoolConfig::new(connection_pool_max_size.unwrap_or(10)),
+            manager,
+        );
+
+        Self {
+            addr: endpoint,
+            conn: pool,
+            default_ttl,
+        }
+    }
+
+    pub fn addr(&self) -> &str {
+        &self.addr
     }
 
-    pub fn get_redis_connection_manager(&self) -> RedisConnectionManager {
-        if let Some(_client) = self.client.clone() {
-            RedisConnectionManager {
-                client: self.client.clone(),
-                cluster_client: None,
+    pub async fn conn(&self) -> 
Result<bounded::Object<RedisConnectionManager>> {
+        let fut = self.conn.get();
+
+        tokio::select! {
+            _ = tokio::time::sleep(Duration::from_secs(10)) => {
+                Err(Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary())
             }
-        } else {
-            RedisConnectionManager {
-                client: None,
-                cluster_client: self.cluster_client.clone(),
+            result = fut => match result {
+                Ok(conn) => Ok(conn),
+                Err(err) => Err(err),
             }
         }
     }

Reply via email to