This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new cee689d83 refactor: Introduce IpmfsCore for improved service structure 
(#5980)
cee689d83 is described below

commit cee689d830bf4a06f07e22f216312c5b6738d247
Author: miro <[email protected]>
AuthorDate: Tue Apr 8 10:19:12 2025 +0800

    refactor: Introduce IpmfsCore for improved service structure (#5980)
    
    * refactor: Introduce IpmfsCore for improved service structure
    
    * refactor: Migrate ipmfs service to context based http client
---
 core/src/services/ipmfs/backend.rs | 160 +++----------------------------------
 core/src/services/ipmfs/builder.rs |  45 ++++++++---
 core/src/services/ipmfs/core.rs    | 142 ++++++++++++++++++++++++++++++++
 core/src/services/ipmfs/delete.rs  |   6 +-
 core/src/services/ipmfs/lister.rs  |  10 +--
 core/src/services/ipmfs/mod.rs     |   2 +
 core/src/services/ipmfs/writer.rs  |  13 +--
 7 files changed, 206 insertions(+), 172 deletions(-)

diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index fb60ad823..a14fc2535 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -16,16 +16,15 @@
 // under the License.
 
 use std::fmt;
-use std::fmt::Write;
 use std::str;
 use std::sync::Arc;
 
 use bytes::Buf;
-use http::Request;
 use http::Response;
 use http::StatusCode;
 use serde::Deserialize;
 
+use super::core::IpmfsCore;
 use super::delete::IpmfsDeleter;
 use super::error::parse_error;
 use super::lister::IpmfsLister;
@@ -37,54 +36,17 @@ use crate::*;
 #[doc = include_str!("docs.md")]
 #[derive(Clone)]
 pub struct IpmfsBackend {
-    info: Arc<AccessorInfo>,
-    root: String,
-    endpoint: String,
-    client: HttpClient,
+    pub core: Arc<IpmfsCore>,
 }
 
 impl fmt::Debug for IpmfsBackend {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("Backend")
-            .field("root", &self.root)
-            .field("endpoint", &self.endpoint)
+        f.debug_struct("IpmfsBackend")
+            .field("core", &self.core)
             .finish()
     }
 }
 
-impl IpmfsBackend {
-    pub(crate) fn new(root: String, client: HttpClient, endpoint: String) -> 
Self {
-        Self {
-            info: {
-                let am = AccessorInfo::default();
-                am.set_scheme(Scheme::Ipmfs)
-                    .set_root(&root)
-                    .set_native_capability(Capability {
-                        stat: true,
-                        stat_has_content_length: true,
-
-                        read: true,
-
-                        write: true,
-                        delete: true,
-
-                        list: true,
-                        list_has_content_length: true,
-
-                        shared: true,
-
-                        ..Default::default()
-                    });
-
-                am.into()
-            },
-            root,
-            client,
-            endpoint,
-        }
-    }
-}
-
 impl Access for IpmfsBackend {
     type Reader = HttpBody;
     type Writer = oio::OneShotWriter<IpmfsWriter>;
@@ -96,11 +58,11 @@ impl Access for IpmfsBackend {
     type BlockingDeleter = ();
 
     fn info(&self) -> Arc<AccessorInfo> {
-        self.info.clone()
+        self.core.info.clone()
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-        let resp = self.ipmfs_mkdir(path).await?;
+        let resp = self.core.ipmfs_mkdir(path).await?;
 
         let status = resp.status();
 
@@ -116,7 +78,7 @@ impl Access for IpmfsBackend {
             return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
         }
 
-        let resp = self.ipmfs_stat(path).await?;
+        let resp = self.core.ipmfs_stat(path).await?;
 
         let status = resp.status();
 
@@ -143,7 +105,7 @@ impl Access for IpmfsBackend {
     }
 
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let resp = self.ipmfs_read(path, args.range()).await?;
+        let resp = self.core.ipmfs_read(path, args.range()).await?;
 
         let status = resp.status();
 
@@ -162,123 +124,23 @@ impl Access for IpmfsBackend {
     async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
         Ok((
             RpWrite::default(),
-            oio::OneShotWriter::new(IpmfsWriter::new(self.clone(), 
path.to_string())),
+            oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(), 
path.to_string())),
         ))
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
         Ok((
             RpDelete::default(),
-            
oio::OneShotDeleter::new(IpmfsDeleter::new(Arc::new(self.clone()))),
+            oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
         ))
     }
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let l = IpmfsLister::new(Arc::new(self.clone()), &self.root, path);
+        let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
         Ok((RpList::default(), oio::PageLister::new(l)))
     }
 }
 
-impl IpmfsBackend {
-    async fn ipmfs_stat(&self, path: &str) -> Result<Response<Buffer>> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/api/v0/files/stat?arg={}",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::post(url);
-        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub async fn ipmfs_read(&self, path: &str, range: BytesRange) -> 
Result<Response<HttpBody>> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let mut url = format!(
-            "{}/api/v0/files/read?arg={}",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-
-        write!(url, "&offset={}", range.offset()).expect("write into string 
must succeed");
-        if let Some(count) = range.size() {
-            write!(url, "&count={count}").expect("write into string must 
succeed")
-        }
-
-        let req = Request::post(url);
-        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.client.fetch(req).await
-    }
-
-    pub async fn ipmfs_rm(&self, path: &str) -> Result<Response<Buffer>> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/api/v0/files/rm?arg={}",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::post(url);
-        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-
-    pub(crate) async fn ipmfs_ls(&self, path: &str) -> 
Result<Response<Buffer>> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/api/v0/files/ls?arg={}&long=true",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::post(url);
-        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-
-    async fn ipmfs_mkdir(&self, path: &str) -> Result<Response<Buffer>> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let url = format!(
-            "{}/api/v0/files/mkdir?arg={}&parents=true",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-
-        let req = Request::post(url);
-        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
-        self.client.send(req).await
-    }
-
-    /// Support write from reader.
-    pub async fn ipmfs_write(&self, path: &str, body: Buffer) -> 
Result<Response<Buffer>> {
-        let p = build_rooted_abs_path(&self.root, path);
-
-        let url = format!(
-            
"{}/api/v0/files/write?arg={}&parents=true&create=true&truncate=true",
-            self.endpoint,
-            percent_encode_path(&p)
-        );
-
-        let multipart = 
Multipart::new().part(FormDataPart::new("data").content(body));
-
-        let req: http::request::Builder = Request::post(url);
-        let req = multipart.apply(req)?;
-
-        self.client.send(req).await
-    }
-}
-
 #[derive(Deserialize, Default, Debug)]
 #[serde(default)]
 struct IpfsStatResponse {
diff --git a/core/src/services/ipmfs/builder.rs 
b/core/src/services/ipmfs/builder.rs
index 6bb56ebcd..98db81434 100644
--- a/core/src/services/ipmfs/builder.rs
+++ b/core/src/services/ipmfs/builder.rs
@@ -15,15 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use log::debug;
 
 use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
 use crate::raw::*;
 use crate::services::IpmfsConfig;
 use crate::*;
 
 impl Configurator for IpmfsConfig {
     type Builder = IpmfsBuilder;
+
+    #[allow(deprecated)]
     fn into_builder(self) -> Self::Builder {
         IpmfsBuilder {
             config: self,
@@ -75,6 +80,8 @@ impl Configurator for IpmfsConfig {
 #[derive(Default, Debug)]
 pub struct IpmfsBuilder {
     config: IpmfsConfig,
+
+    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` 
instead")]
     http_client: Option<HttpClient>,
 }
 
@@ -108,6 +115,8 @@ impl IpmfsBuilder {
     ///
     /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
     /// during minor updates.
+    #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client` 
instead")]
+    #[allow(deprecated)]
     pub fn http_client(mut self, client: HttpClient) -> Self {
         self.http_client = Some(client);
         self
@@ -128,15 +137,33 @@ impl Builder for IpmfsBuilder {
             .clone()
             .unwrap_or_else(|| "http://localhost:5001".to_string());
 
-        let client = if let Some(client) = self.http_client {
-            client
-        } else {
-            HttpClient::new().map_err(|err| {
-                err.with_operation("Builder::build")
-                    .with_context("service", Scheme::Ipmfs)
-            })?
-        };
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Ipmfs)
+            .set_root(&root)
+            .set_native_capability(Capability {
+                stat: true,
+                stat_has_content_length: true,
+
+                read: true,
+
+                write: true,
+                delete: true,
+
+                list: true,
+                list_has_content_length: true,
+
+                shared: true,
+
+                ..Default::default()
+            });
+
+        let accessor_info = Arc::new(info);
+        let core = Arc::new(IpmfsCore {
+            info: accessor_info,
+            root: root.to_string(),
+            endpoint: endpoint.to_string(),
+        });
 
-        Ok(IpmfsBackend::new(root, client, endpoint))
+        Ok(IpmfsBackend { core })
     }
 }
diff --git a/core/src/services/ipmfs/core.rs b/core/src/services/ipmfs/core.rs
new file mode 100644
index 000000000..92bb3cfd7
--- /dev/null
+++ b/core/src/services/ipmfs/core.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use http::Request;
+use http::Response;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct IpmfsCore {
+    pub info: Arc<AccessorInfo>,
+    pub root: String,
+    pub endpoint: String,
+}
+
+impl Debug for IpmfsCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("IpmfsCore")
+            .field("root", &self.root)
+            .field("endpoint", &self.endpoint)
+            .finish()
+    }
+}
+
+impl IpmfsCore {
+    pub async fn ipmfs_stat(&self, path: &str) -> Result<Response<Buffer>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/api/v0/files/stat?arg={}",
+            self.endpoint,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::post(url);
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    pub async fn ipmfs_read(&self, path: &str, range: BytesRange) -> 
Result<Response<HttpBody>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let mut url = format!(
+            "{}/api/v0/files/read?arg={}",
+            self.endpoint,
+            percent_encode_path(&p)
+        );
+
+        write!(url, "&offset={}", range.offset()).expect("write into string 
must succeed");
+        if let Some(count) = range.size() {
+            write!(url, "&count={count}").expect("write into string must 
succeed")
+        }
+
+        let req = Request::post(url);
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+        self.info.http_client().fetch(req).await
+    }
+
+    pub async fn ipmfs_rm(&self, path: &str) -> Result<Response<Buffer>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/api/v0/files/rm?arg={}",
+            self.endpoint,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::post(url);
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    pub(crate) async fn ipmfs_ls(&self, path: &str) -> 
Result<Response<Buffer>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/api/v0/files/ls?arg={}&long=true",
+            self.endpoint,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::post(url);
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    pub async fn ipmfs_mkdir(&self, path: &str) -> Result<Response<Buffer>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/api/v0/files/mkdir?arg={}&parents=true",
+            self.endpoint,
+            percent_encode_path(&p)
+        );
+
+        let req = Request::post(url);
+        let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    /// Support write from reader.
+    pub async fn ipmfs_write(&self, path: &str, body: Buffer) -> 
Result<Response<Buffer>> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let url = format!(
+            
"{}/api/v0/files/write?arg={}&parents=true&create=true&truncate=true",
+            self.endpoint,
+            percent_encode_path(&p)
+        );
+
+        let multipart = 
Multipart::new().part(FormDataPart::new("data").content(body));
+
+        let req: http::request::Builder = Request::post(url);
+        let req = multipart.apply(req)?;
+
+        self.info.http_client().send(req).await
+    }
+}
diff --git a/core/src/services/ipmfs/delete.rs 
b/core/src/services/ipmfs/delete.rs
index a4430379d..ddf74f280 100644
--- a/core/src/services/ipmfs/delete.rs
+++ b/core/src/services/ipmfs/delete.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
@@ -23,11 +23,11 @@ use http::StatusCode;
 use std::sync::Arc;
 
 pub struct IpmfsDeleter {
-    core: Arc<IpmfsBackend>,
+    core: Arc<IpmfsCore>,
 }
 
 impl IpmfsDeleter {
-    pub fn new(core: Arc<IpmfsBackend>) -> Self {
+    pub fn new(core: Arc<IpmfsCore>) -> Self {
         Self { core }
     }
 }
diff --git a/core/src/services/ipmfs/lister.rs 
b/core/src/services/ipmfs/lister.rs
index 292b0a9f5..6b7c3e790 100644
--- a/core/src/services/ipmfs/lister.rs
+++ b/core/src/services/ipmfs/lister.rs
@@ -21,7 +21,7 @@ use bytes::Buf;
 use http::StatusCode;
 use serde::Deserialize;
 
-use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::EntryMode;
@@ -30,15 +30,15 @@ use crate::Metadata;
 use crate::Result;
 
 pub struct IpmfsLister {
-    backend: Arc<IpmfsBackend>,
+    core: Arc<IpmfsCore>,
     root: String,
     path: String,
 }
 
 impl IpmfsLister {
-    pub fn new(backend: Arc<IpmfsBackend>, root: &str, path: &str) -> Self {
+    pub fn new(core: Arc<IpmfsCore>, root: &str, path: &str) -> Self {
         Self {
-            backend,
+            core,
             root: root.to_string(),
             path: path.to_string(),
         }
@@ -47,7 +47,7 @@ impl IpmfsLister {
 
 impl oio::PageList for IpmfsLister {
     async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
-        let resp = self.backend.ipmfs_ls(&self.path).await?;
+        let resp = self.core.ipmfs_ls(&self.path).await?;
 
         if resp.status() != StatusCode::OK {
             let err = parse_error(resp);
diff --git a/core/src/services/ipmfs/mod.rs b/core/src/services/ipmfs/mod.rs
index 4b6e9a5bc..ca70f362b 100644
--- a/core/src/services/ipmfs/mod.rs
+++ b/core/src/services/ipmfs/mod.rs
@@ -18,6 +18,8 @@
 #[cfg(feature = "services-ipmfs")]
 mod backend;
 #[cfg(feature = "services-ipmfs")]
+mod core;
+#[cfg(feature = "services-ipmfs")]
 mod delete;
 #[cfg(feature = "services-ipmfs")]
 mod error;
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 6254a67d8..9ca8b10a3 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -15,28 +15,29 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::sync::Arc;
+
 use http::StatusCode;
 
-use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
 
 pub struct IpmfsWriter {
-    backend: IpmfsBackend,
-
+    core: Arc<IpmfsCore>,
     path: String,
 }
 
 impl IpmfsWriter {
-    pub fn new(backend: IpmfsBackend, path: String) -> Self {
-        IpmfsWriter { backend, path }
+    pub fn new(core: Arc<IpmfsCore>, path: String) -> Self {
+        IpmfsWriter { core, path }
     }
 }
 
 impl oio::OneShotWrite for IpmfsWriter {
     async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
-        let resp = self.backend.ipmfs_write(&self.path, bs).await?;
+        let resp = self.core.ipmfs_write(&self.path, bs).await?;
 
         let status = resp.status();
 

Reply via email to