This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new f94e1f3b8 refactor: migrate memcached service from adapter::kv to impl
Access directly (#6714)
f94e1f3b8 is described below
commit f94e1f3b8eb080473b4d730adc9245d18c1d2cce
Author: Qinxuan Chen <[email protected]>
AuthorDate: Tue Oct 21 21:38:23 2025 +0800
refactor: migrate memcached service from adapter::kv to impl Access
directly (#6714)
* refactor: migrate memcached service from adapter::kv to impl Access
directly
* adjust some imports
---
core/src/services/memcached/backend.rs | 184 +++++++++------------
core/src/services/memcached/core.rs | 132 +++++++++++++++
core/src/services/memcached/{mod.rs => deleter.rs} | 28 +++-
core/src/services/memcached/docs.md | 5 +-
core/src/services/memcached/mod.rs | 5 +-
core/src/services/memcached/writer.rs | 59 +++++++
6 files changed, 294 insertions(+), 119 deletions(-)
diff --git a/core/src/services/memcached/backend.rs
b/core/src/services/memcached/backend.rs
index 5cdde5a0d..7c7260084 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
use std::time::Duration;
-use bb8::RunError;
-use tokio::net::TcpStream;
use tokio::sync::OnceCell;
-use super::binary;
-use crate::raw::adapters::kv;
+use super::config::MemcachedConfig;
+use super::core::*;
+use super::deleter::MemcachedDeleter;
+use super::writer::MemcachedWriter;
use crate::raw::*;
-use crate::services::MemcachedConfig;
use crate::*;
/// [Memcached](https://memcached.org/) service support.
@@ -138,11 +138,11 @@ impl Builder for MemcachedBuilder {
);
let conn = OnceCell::new();
- Ok(MemcachedBackend::new(Adapter {
+ Ok(MemcachedBackend::new(MemcachedCore {
+ conn,
endpoint,
username: self.config.username.clone(),
password: self.config.password.clone(),
- conn,
default_ttl: self.config.default_ttl,
})
.with_normalized_root(root))
@@ -150,128 +150,92 @@ impl Builder for MemcachedBuilder {
}
/// Backend for memcached services.
-pub type MemcachedBackend = kv::Backend<Adapter>;
-
#[derive(Clone, Debug)]
-pub struct Adapter {
- endpoint: String,
- username: Option<String>,
- password: Option<String>,
- default_ttl: Option<Duration>,
- conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
+pub struct MemcachedBackend {
+ core: Arc<MemcachedCore>,
+ root: String,
+ info: Arc<AccessorInfo>,
}
-impl Adapter {
- async fn conn(&self) -> Result<bb8::PooledConnection<'_,
MemcacheConnectionManager>> {
- let pool = self
- .conn
- .get_or_try_init(|| async {
- let mgr = MemcacheConnectionManager::new(
- &self.endpoint,
- self.username.clone(),
- self.password.clone(),
- );
-
- bb8::Pool::builder().build(mgr).await.map_err(|err| {
- Error::new(ErrorKind::ConfigInvalid, "connect to
memecached failed")
- .set_source(err)
- })
- })
- .await?;
+impl MemcachedBackend {
+ pub fn new(core: MemcachedCore) -> Self {
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Memcached.into_static());
+ info.set_name("memcached");
+ info.set_root("/");
+ info.set_native_capability(Capability {
+ read: true,
+ stat: true,
+ write: true,
+ write_can_empty: true,
+ delete: true,
+ shared: true,
+ ..Default::default()
+ });
- pool.get().await.map_err(|err| match err {
- RunError::TimedOut => {
- Error::new(ErrorKind::Unexpected, "get connection from pool
failed").set_temporary()
- }
- RunError::User(err) => err,
- })
- }
-}
-
-impl kv::Adapter for Adapter {
- type Scanner = ();
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Memcached,
- "memcached",
- Capability {
- read: true,
- write: true,
- shared: true,
-
- ..Default::default()
- },
- )
- }
-
- async fn get(&self, key: &str) -> Result<Option<Buffer>> {
- let mut conn = self.conn().await?;
- let result = conn.get(&percent_encode_path(key)).await?;
- Ok(result.map(Buffer::from))
+ Self {
+ core: Arc::new(core),
+ root: "/".to_string(),
+ info: Arc::new(info),
+ }
}
- async fn set(&self, key: &str, value: Buffer) -> Result<()> {
- let mut conn = self.conn().await?;
-
- conn.set(
- &percent_encode_path(key),
- &value.to_vec(),
- // Set expiration to 0 if ttl not set.
- self.default_ttl
- .map(|v| v.as_secs() as u32)
- .unwrap_or_default(),
- )
- .await
+ fn with_normalized_root(mut self, root: String) -> Self {
+ self.info.set_root(&root);
+ self.root = root;
+ self
}
+}
- async fn delete(&self, key: &str) -> Result<()> {
- let mut conn = self.conn().await?;
+impl Access for MemcachedBackend {
+ type Reader = Buffer;
+ type Writer = MemcachedWriter;
+ type Lister = ();
+ type Deleter = oio::OneShotDeleter<MemcachedDeleter>;
- conn.delete(&percent_encode_path(key)).await
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
-}
-/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
-#[derive(Clone, Debug)]
-struct MemcacheConnectionManager {
- address: String,
- username: Option<String>,
- password: Option<String>,
-}
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let p = build_abs_path(&self.root, path);
-impl MemcacheConnectionManager {
- fn new(address: &str, username: Option<String>, password: Option<String>)
-> Self {
- Self {
- address: address.to_string(),
- username,
- password,
+ if p == build_abs_path(&self.root, "") {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ let bs = self.core.get(&p).await?;
+ match bs {
+ Some(bs) => Ok(RpStat::new(
+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+ )),
+ None => Err(Error::new(ErrorKind::NotFound, "kv not found in
memcached")),
+ }
}
}
-}
-
-impl bb8::ManageConnection for MemcacheConnectionManager {
- type Connection = binary::Connection;
- type Error = Error;
- /// TODO: Implement unix stream support.
- async fn connect(&self) -> Result<Self::Connection, Self::Error> {
- let conn = TcpStream::connect(&self.address)
- .await
- .map_err(new_std_io_error)?;
- let mut conn = binary::Connection::new(conn);
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_abs_path(&self.root, path);
+ let bs = match self.core.get(&p).await? {
+ Some(bs) => bs,
+ None => return Err(Error::new(ErrorKind::NotFound, "kv not found
in memcached")),
+ };
+ Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
+ }
- if let (Some(username), Some(password)) = (self.username.as_ref(),
self.password.as_ref()) {
- conn.auth(username, password).await?;
- }
- Ok(conn)
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_abs_path(&self.root, path);
+ Ok((RpWrite::new(), MemcachedWriter::new(self.core.clone(), p)))
}
- async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
- conn.version().await.map(|_| ())
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ Ok((
+ RpDelete::default(),
+ oio::OneShotDeleter::new(MemcachedDeleter::new(self.core.clone(),
self.root.clone())),
+ ))
}
- fn has_broken(&self, _: &mut Self::Connection) -> bool {
- false
+ async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
+ let _ = build_abs_path(&self.root, path);
+ Ok((RpList::default(), ()))
}
}
diff --git a/core/src/services/memcached/core.rs
b/core/src/services/memcached/core.rs
new file mode 100644
index 000000000..82c21584d
--- /dev/null
+++ b/core/src/services/memcached/core.rs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::time::Duration;
+
+use bb8::RunError;
+use tokio::net::TcpStream;
+use tokio::sync::OnceCell;
+
+use super::binary;
+use crate::raw::*;
+use crate::*;
+
+/// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
+#[derive(Clone, Debug)]
+pub struct MemcacheConnectionManager {
+ address: String,
+ username: Option<String>,
+ password: Option<String>,
+}
+
+impl MemcacheConnectionManager {
+ fn new(address: &str, username: Option<String>, password: Option<String>)
-> Self {
+ Self {
+ address: address.to_string(),
+ username,
+ password,
+ }
+ }
+}
+
+impl bb8::ManageConnection for MemcacheConnectionManager {
+ type Connection = binary::Connection;
+ type Error = Error;
+
+ /// TODO: Implement unix stream support.
+ async fn connect(&self) -> Result<Self::Connection, Self::Error> {
+ let conn = TcpStream::connect(&self.address)
+ .await
+ .map_err(new_std_io_error)?;
+ let mut conn = binary::Connection::new(conn);
+
+ if let (Some(username), Some(password)) = (self.username.as_ref(),
self.password.as_ref()) {
+ conn.auth(username, password).await?;
+ }
+ Ok(conn)
+ }
+
+ async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(),
Self::Error> {
+ conn.version().await.map(|_| ())
+ }
+
+ fn has_broken(&self, _: &mut Self::Connection) -> bool {
+ false
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct MemcachedCore {
+ pub conn: OnceCell<bb8::Pool<MemcacheConnectionManager>>,
+ pub endpoint: String,
+ pub username: Option<String>,
+ pub password: Option<String>,
+ pub default_ttl: Option<Duration>,
+}
+
+impl MemcachedCore {
+ async fn conn(&self) -> Result<bb8::PooledConnection<'_,
MemcacheConnectionManager>> {
+ let pool = self
+ .conn
+ .get_or_try_init(|| async {
+ let mgr = MemcacheConnectionManager::new(
+ &self.endpoint,
+ self.username.clone(),
+ self.password.clone(),
+ );
+
+ bb8::Pool::builder().build(mgr).await.map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "connect to
memecached failed")
+ .set_source(err)
+ })
+ })
+ .await?;
+
+ pool.get().await.map_err(|err| match err {
+ RunError::TimedOut => {
+ Error::new(ErrorKind::Unexpected, "get connection from pool
failed").set_temporary()
+ }
+ RunError::User(err) => err,
+ })
+ }
+
+ pub async fn get(&self, key: &str) -> Result<Option<Buffer>> {
+ let mut conn = self.conn().await?;
+ let result = conn.get(&percent_encode_path(key)).await?;
+ Ok(result.map(Buffer::from))
+ }
+
+ pub async fn set(&self, key: &str, value: Buffer) -> Result<()> {
+ let mut conn = self.conn().await?;
+
+ conn.set(
+ &percent_encode_path(key),
+ &value.to_vec(),
+ // Set expiration to 0 if ttl not set.
+ self.default_ttl
+ .map(|v| v.as_secs() as u32)
+ .unwrap_or_default(),
+ )
+ .await
+ }
+
+ pub async fn delete(&self, key: &str) -> Result<()> {
+ let mut conn = self.conn().await?;
+
+ conn.delete(&percent_encode_path(key)).await
+ }
+}
diff --git a/core/src/services/memcached/mod.rs
b/core/src/services/memcached/deleter.rs
similarity index 60%
copy from core/src/services/memcached/mod.rs
copy to core/src/services/memcached/deleter.rs
index 4cb6779c7..725863711 100644
--- a/core/src/services/memcached/mod.rs
+++ b/core/src/services/memcached/deleter.rs
@@ -15,10 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod binary;
+use std::sync::Arc;
-mod backend;
-pub use backend::MemcachedBuilder as Memcached;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
-mod config;
-pub use config::MemcachedConfig;
+pub struct MemcachedDeleter {
+ core: Arc<MemcachedCore>,
+ root: String,
+}
+
+impl MemcachedDeleter {
+ pub fn new(core: Arc<MemcachedCore>, root: String) -> Self {
+ Self { core, root }
+ }
+}
+
+impl oio::OneShotDelete for MemcachedDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let p = build_abs_path(&self.root, &path);
+ self.core.delete(&p).await?;
+ Ok(())
+ }
+}
diff --git a/core/src/services/memcached/docs.md
b/core/src/services/memcached/docs.md
index 40de9df81..f643930fa 100644
--- a/core/src/services/memcached/docs.md
+++ b/core/src/services/memcached/docs.md
@@ -2,16 +2,15 @@
This service can be used to:
+- [ ] create_dir
- [x] stat
- [x] read
- [x] write
-- [ ] create_dir
- [x] delete
- [ ] copy
- [ ] rename
-- [ ] ~~list~~
+- [ ] list
- [ ] ~~presign~~
-- [ ] blocking
## Configuration
diff --git a/core/src/services/memcached/mod.rs
b/core/src/services/memcached/mod.rs
index 4cb6779c7..bd29318fb 100644
--- a/core/src/services/memcached/mod.rs
+++ b/core/src/services/memcached/mod.rs
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+mod backend;
mod binary;
+mod core;
+mod deleter;
+mod writer;
-mod backend;
pub use backend::MemcachedBuilder as Memcached;
mod config;
diff --git a/core/src/services/memcached/writer.rs
b/core/src/services/memcached/writer.rs
new file mode 100644
index 000000000..bb2e843b3
--- /dev/null
+++ b/core/src/services/memcached/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct MemcachedWriter {
+ core: Arc<MemcachedCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl MemcachedWriter {
+ pub fn new(core: Arc<MemcachedCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for MemcachedWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+ self.core.set(&self.path, buf).await?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}