This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new cee689d83 refactor: Introduce IpmfsCore for improved service structure
(#5980)
cee689d83 is described below
commit cee689d830bf4a06f07e22f216312c5b6738d247
Author: miro <[email protected]>
AuthorDate: Tue Apr 8 10:19:12 2025 +0800
refactor: Introduce IpmfsCore for improved service structure (#5980)
* refactor: Introduce IpmfsCore for improved service structure
* refactor: Migrate ipmfs service to context based http client
---
core/src/services/ipmfs/backend.rs | 160 +++----------------------------------
core/src/services/ipmfs/builder.rs | 45 ++++++++---
core/src/services/ipmfs/core.rs | 142 ++++++++++++++++++++++++++++++++
core/src/services/ipmfs/delete.rs | 6 +-
core/src/services/ipmfs/lister.rs | 10 +--
core/src/services/ipmfs/mod.rs | 2 +
core/src/services/ipmfs/writer.rs | 13 +--
7 files changed, 206 insertions(+), 172 deletions(-)
diff --git a/core/src/services/ipmfs/backend.rs
b/core/src/services/ipmfs/backend.rs
index fb60ad823..a14fc2535 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -16,16 +16,15 @@
// under the License.
use std::fmt;
-use std::fmt::Write;
use std::str;
use std::sync::Arc;
use bytes::Buf;
-use http::Request;
use http::Response;
use http::StatusCode;
use serde::Deserialize;
+use super::core::IpmfsCore;
use super::delete::IpmfsDeleter;
use super::error::parse_error;
use super::lister::IpmfsLister;
@@ -37,54 +36,17 @@ use crate::*;
#[doc = include_str!("docs.md")]
#[derive(Clone)]
pub struct IpmfsBackend {
- info: Arc<AccessorInfo>,
- root: String,
- endpoint: String,
- client: HttpClient,
+ pub core: Arc<IpmfsCore>,
}
impl fmt::Debug for IpmfsBackend {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("Backend")
- .field("root", &self.root)
- .field("endpoint", &self.endpoint)
+ f.debug_struct("IpmfsBackend")
+ .field("core", &self.core)
.finish()
}
}
-impl IpmfsBackend {
- pub(crate) fn new(root: String, client: HttpClient, endpoint: String) ->
Self {
- Self {
- info: {
- let am = AccessorInfo::default();
- am.set_scheme(Scheme::Ipmfs)
- .set_root(&root)
- .set_native_capability(Capability {
- stat: true,
- stat_has_content_length: true,
-
- read: true,
-
- write: true,
- delete: true,
-
- list: true,
- list_has_content_length: true,
-
- shared: true,
-
- ..Default::default()
- });
-
- am.into()
- },
- root,
- client,
- endpoint,
- }
- }
-}
-
impl Access for IpmfsBackend {
type Reader = HttpBody;
type Writer = oio::OneShotWriter<IpmfsWriter>;
@@ -96,11 +58,11 @@ impl Access for IpmfsBackend {
type BlockingDeleter = ();
fn info(&self) -> Arc<AccessorInfo> {
- self.info.clone()
+ self.core.info.clone()
}
async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
- let resp = self.ipmfs_mkdir(path).await?;
+ let resp = self.core.ipmfs_mkdir(path).await?;
let status = resp.status();
@@ -116,7 +78,7 @@ impl Access for IpmfsBackend {
return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
}
- let resp = self.ipmfs_stat(path).await?;
+ let resp = self.core.ipmfs_stat(path).await?;
let status = resp.status();
@@ -143,7 +105,7 @@ impl Access for IpmfsBackend {
}
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
- let resp = self.ipmfs_read(path, args.range()).await?;
+ let resp = self.core.ipmfs_read(path, args.range()).await?;
let status = resp.status();
@@ -162,123 +124,23 @@ impl Access for IpmfsBackend {
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
Ok((
RpWrite::default(),
- oio::OneShotWriter::new(IpmfsWriter::new(self.clone(),
path.to_string())),
+ oio::OneShotWriter::new(IpmfsWriter::new(self.core.clone(),
path.to_string())),
))
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
-
oio::OneShotDeleter::new(IpmfsDeleter::new(Arc::new(self.clone()))),
+ oio::OneShotDeleter::new(IpmfsDeleter::new(self.core.clone())),
))
}
async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
- let l = IpmfsLister::new(Arc::new(self.clone()), &self.root, path);
+ let l = IpmfsLister::new(self.core.clone(), &self.core.root, path);
Ok((RpList::default(), oio::PageLister::new(l)))
}
}
-impl IpmfsBackend {
- async fn ipmfs_stat(&self, path: &str) -> Result<Response<Buffer>> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let url = format!(
- "{}/api/v0/files/stat?arg={}",
- self.endpoint,
- percent_encode_path(&p)
- );
-
- let req = Request::post(url);
- let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-
- pub async fn ipmfs_read(&self, path: &str, range: BytesRange) ->
Result<Response<HttpBody>> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let mut url = format!(
- "{}/api/v0/files/read?arg={}",
- self.endpoint,
- percent_encode_path(&p)
- );
-
- write!(url, "&offset={}", range.offset()).expect("write into string
must succeed");
- if let Some(count) = range.size() {
- write!(url, "&count={count}").expect("write into string must
succeed")
- }
-
- let req = Request::post(url);
- let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.client.fetch(req).await
- }
-
- pub async fn ipmfs_rm(&self, path: &str) -> Result<Response<Buffer>> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let url = format!(
- "{}/api/v0/files/rm?arg={}",
- self.endpoint,
- percent_encode_path(&p)
- );
-
- let req = Request::post(url);
- let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-
- pub(crate) async fn ipmfs_ls(&self, path: &str) ->
Result<Response<Buffer>> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let url = format!(
- "{}/api/v0/files/ls?arg={}&long=true",
- self.endpoint,
- percent_encode_path(&p)
- );
-
- let req = Request::post(url);
- let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-
- async fn ipmfs_mkdir(&self, path: &str) -> Result<Response<Buffer>> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let url = format!(
- "{}/api/v0/files/mkdir?arg={}&parents=true",
- self.endpoint,
- percent_encode_path(&p)
- );
-
- let req = Request::post(url);
- let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
-
- self.client.send(req).await
- }
-
- /// Support write from reader.
- pub async fn ipmfs_write(&self, path: &str, body: Buffer) ->
Result<Response<Buffer>> {
- let p = build_rooted_abs_path(&self.root, path);
-
- let url = format!(
-
"{}/api/v0/files/write?arg={}&parents=true&create=true&truncate=true",
- self.endpoint,
- percent_encode_path(&p)
- );
-
- let multipart =
Multipart::new().part(FormDataPart::new("data").content(body));
-
- let req: http::request::Builder = Request::post(url);
- let req = multipart.apply(req)?;
-
- self.client.send(req).await
- }
-}
-
#[derive(Deserialize, Default, Debug)]
#[serde(default)]
struct IpfsStatResponse {
diff --git a/core/src/services/ipmfs/builder.rs
b/core/src/services/ipmfs/builder.rs
index 6bb56ebcd..98db81434 100644
--- a/core/src/services/ipmfs/builder.rs
+++ b/core/src/services/ipmfs/builder.rs
@@ -15,15 +15,20 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use log::debug;
use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
use crate::raw::*;
use crate::services::IpmfsConfig;
use crate::*;
impl Configurator for IpmfsConfig {
type Builder = IpmfsBuilder;
+
+ #[allow(deprecated)]
fn into_builder(self) -> Self::Builder {
IpmfsBuilder {
config: self,
@@ -75,6 +80,8 @@ impl Configurator for IpmfsConfig {
#[derive(Default, Debug)]
pub struct IpmfsBuilder {
config: IpmfsConfig,
+
+ #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client`
instead")]
http_client: Option<HttpClient>,
}
@@ -108,6 +115,8 @@ impl IpmfsBuilder {
///
/// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
/// during minor updates.
+ #[deprecated(since = "0.53.0", note = "Use `Operator::update_http_client`
instead")]
+ #[allow(deprecated)]
pub fn http_client(mut self, client: HttpClient) -> Self {
self.http_client = Some(client);
self
@@ -128,15 +137,33 @@ impl Builder for IpmfsBuilder {
.clone()
.unwrap_or_else(|| "http://localhost:5001".to_string());
- let client = if let Some(client) = self.http_client {
- client
- } else {
- HttpClient::new().map_err(|err| {
- err.with_operation("Builder::build")
- .with_context("service", Scheme::Ipmfs)
- })?
- };
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Ipmfs)
+ .set_root(&root)
+ .set_native_capability(Capability {
+ stat: true,
+ stat_has_content_length: true,
+
+ read: true,
+
+ write: true,
+ delete: true,
+
+ list: true,
+ list_has_content_length: true,
+
+ shared: true,
+
+ ..Default::default()
+ });
+
+ let accessor_info = Arc::new(info);
+ let core = Arc::new(IpmfsCore {
+ info: accessor_info,
+ root: root.to_string(),
+ endpoint: endpoint.to_string(),
+ });
- Ok(IpmfsBackend::new(root, client, endpoint))
+ Ok(IpmfsBackend { core })
}
}
diff --git a/core/src/services/ipmfs/core.rs b/core/src/services/ipmfs/core.rs
new file mode 100644
index 000000000..92bb3cfd7
--- /dev/null
+++ b/core/src/services/ipmfs/core.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::fmt::Write;
+use std::sync::Arc;
+
+use http::Request;
+use http::Response;
+
+use crate::raw::*;
+use crate::*;
+
+pub struct IpmfsCore {
+ pub info: Arc<AccessorInfo>,
+ pub root: String,
+ pub endpoint: String,
+}
+
+impl Debug for IpmfsCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("IpmfsCore")
+ .field("root", &self.root)
+ .field("endpoint", &self.endpoint)
+ .finish()
+ }
+}
+
+impl IpmfsCore {
+ pub async fn ipmfs_stat(&self, path: &str) -> Result<Response<Buffer>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/api/v0/files/stat?arg={}",
+ self.endpoint,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::post(url);
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ pub async fn ipmfs_read(&self, path: &str, range: BytesRange) ->
Result<Response<HttpBody>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let mut url = format!(
+ "{}/api/v0/files/read?arg={}",
+ self.endpoint,
+ percent_encode_path(&p)
+ );
+
+ write!(url, "&offset={}", range.offset()).expect("write into string
must succeed");
+ if let Some(count) = range.size() {
+ write!(url, "&count={count}").expect("write into string must
succeed")
+ }
+
+ let req = Request::post(url);
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+ self.info.http_client().fetch(req).await
+ }
+
+ pub async fn ipmfs_rm(&self, path: &str) -> Result<Response<Buffer>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/api/v0/files/rm?arg={}",
+ self.endpoint,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::post(url);
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ pub(crate) async fn ipmfs_ls(&self, path: &str) ->
Result<Response<Buffer>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/api/v0/files/ls?arg={}&long=true",
+ self.endpoint,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::post(url);
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ pub async fn ipmfs_mkdir(&self, path: &str) -> Result<Response<Buffer>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+ "{}/api/v0/files/mkdir?arg={}&parents=true",
+ self.endpoint,
+ percent_encode_path(&p)
+ );
+
+ let req = Request::post(url);
+ let req = req.body(Buffer::new()).map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ /// Support write from reader.
+ pub async fn ipmfs_write(&self, path: &str, body: Buffer) ->
Result<Response<Buffer>> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let url = format!(
+
"{}/api/v0/files/write?arg={}&parents=true&create=true&truncate=true",
+ self.endpoint,
+ percent_encode_path(&p)
+ );
+
+ let multipart =
Multipart::new().part(FormDataPart::new("data").content(body));
+
+ let req: http::request::Builder = Request::post(url);
+ let req = multipart.apply(req)?;
+
+ self.info.http_client().send(req).await
+ }
+}
diff --git a/core/src/services/ipmfs/delete.rs
b/core/src/services/ipmfs/delete.rs
index a4430379d..ddf74f280 100644
--- a/core/src/services/ipmfs/delete.rs
+++ b/core/src/services/ipmfs/delete.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
@@ -23,11 +23,11 @@ use http::StatusCode;
use std::sync::Arc;
pub struct IpmfsDeleter {
- core: Arc<IpmfsBackend>,
+ core: Arc<IpmfsCore>,
}
impl IpmfsDeleter {
- pub fn new(core: Arc<IpmfsBackend>) -> Self {
+ pub fn new(core: Arc<IpmfsCore>) -> Self {
Self { core }
}
}
diff --git a/core/src/services/ipmfs/lister.rs
b/core/src/services/ipmfs/lister.rs
index 292b0a9f5..6b7c3e790 100644
--- a/core/src/services/ipmfs/lister.rs
+++ b/core/src/services/ipmfs/lister.rs
@@ -21,7 +21,7 @@ use bytes::Buf;
use http::StatusCode;
use serde::Deserialize;
-use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::EntryMode;
@@ -30,15 +30,15 @@ use crate::Metadata;
use crate::Result;
pub struct IpmfsLister {
- backend: Arc<IpmfsBackend>,
+ core: Arc<IpmfsCore>,
root: String,
path: String,
}
impl IpmfsLister {
- pub fn new(backend: Arc<IpmfsBackend>, root: &str, path: &str) -> Self {
+ pub fn new(core: Arc<IpmfsCore>, root: &str, path: &str) -> Self {
Self {
- backend,
+ core,
root: root.to_string(),
path: path.to_string(),
}
@@ -47,7 +47,7 @@ impl IpmfsLister {
impl oio::PageList for IpmfsLister {
async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
- let resp = self.backend.ipmfs_ls(&self.path).await?;
+ let resp = self.core.ipmfs_ls(&self.path).await?;
if resp.status() != StatusCode::OK {
let err = parse_error(resp);
diff --git a/core/src/services/ipmfs/mod.rs b/core/src/services/ipmfs/mod.rs
index 4b6e9a5bc..ca70f362b 100644
--- a/core/src/services/ipmfs/mod.rs
+++ b/core/src/services/ipmfs/mod.rs
@@ -18,6 +18,8 @@
#[cfg(feature = "services-ipmfs")]
mod backend;
#[cfg(feature = "services-ipmfs")]
+mod core;
+#[cfg(feature = "services-ipmfs")]
mod delete;
#[cfg(feature = "services-ipmfs")]
mod error;
diff --git a/core/src/services/ipmfs/writer.rs
b/core/src/services/ipmfs/writer.rs
index 6254a67d8..9ca8b10a3 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -15,28 +15,29 @@
// specific language governing permissions and limitations
// under the License.
+use std::sync::Arc;
+
use http::StatusCode;
-use super::backend::IpmfsBackend;
+use super::core::IpmfsCore;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
pub struct IpmfsWriter {
- backend: IpmfsBackend,
-
+ core: Arc<IpmfsCore>,
path: String,
}
impl IpmfsWriter {
- pub fn new(backend: IpmfsBackend, path: String) -> Self {
- IpmfsWriter { backend, path }
+ pub fn new(core: Arc<IpmfsCore>, path: String) -> Self {
+ IpmfsWriter { core, path }
}
}
impl oio::OneShotWrite for IpmfsWriter {
async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
- let resp = self.backend.ipmfs_write(&self.path, bs).await?;
+ let resp = self.core.ipmfs_write(&self.path, bs).await?;
let status = resp.status();