This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8ae8ea438b feat: add write api for lakefs service. (#5100)
8ae8ea438b is described below
commit 8ae8ea438b8e49fd3660b3b0a326ebad49c75a20
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Sep 9 10:49:53 2024 +0800
feat: add write api for lakefs service. (#5100)
* 1
* 1
* 1
* 1
* 1
* 1
* 1
* 1
* 1
* 1
* 1
* 1
---
core/src/services/lakefs/backend.rs | 11 ++++++-
core/src/services/lakefs/core.rs | 28 +++++++++++++++++
core/src/services/lakefs/docs.md | 2 +-
core/src/services/lakefs/mod.rs | 4 +++
core/src/services/lakefs/{mod.rs => writer.rs} | 43 +++++++++++++++++++-------
5 files changed, 74 insertions(+), 14 deletions(-)
diff --git a/core/src/services/lakefs/backend.rs
b/core/src/services/lakefs/backend.rs
index c5c2816e8e..69e59e395f 100644
--- a/core/src/services/lakefs/backend.rs
+++ b/core/src/services/lakefs/backend.rs
@@ -29,6 +29,7 @@ use super::core::LakefsCore;
use super::core::LakefsStatus;
use super::error::parse_error;
use super::lister::LakefsLister;
+use super::writer::LakefsWriter;
use crate::raw::*;
use crate::services::LakefsConfig;
use crate::*;
@@ -193,7 +194,7 @@ pub struct LakefsBackend {
impl Access for LakefsBackend {
type Reader = HttpBody;
- type Writer = ();
+ type Writer = oio::OneShotWriter<LakefsWriter>;
type Lister = oio::PageLister<LakefsLister>;
type BlockingReader = ();
type BlockingWriter = ();
@@ -206,6 +207,7 @@ impl Access for LakefsBackend {
stat: true,
list: true,
read: true,
+ write: true,
..Default::default()
});
@@ -276,4 +278,11 @@ impl Access for LakefsBackend {
Ok((RpList::default(), oio::PageLister::new(l)))
}
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ Ok((
+ RpWrite::default(),
+ oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(),
path.to_string(), args)),
+ ))
+ }
}
diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs
index 84cb1298cf..91aed0e349 100644
--- a/core/src/services/lakefs/core.rs
+++ b/core/src/services/lakefs/core.rs
@@ -143,6 +143,34 @@ impl LakefsCore {
self.client.send(req).await
}
+
+ pub async fn upload_object(
+ &self,
+ path: &str,
+ _args: &OpWrite,
+ body: Buffer,
+ ) -> Result<Response<Buffer>> {
+ let p = build_abs_path(&self.root, path)
+ .trim_end_matches('/')
+ .to_string();
+
+ let url = format!(
+ "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
+ self.endpoint,
+ self.repository,
+ self.branch,
+ percent_encode_path(&p)
+ );
+
+ let mut req = Request::post(&url);
+
+ let auth_header_content =
format_authorization_by_basic(&self.username, &self.password)?;
+ req = req.header(header::AUTHORIZATION, auth_header_content);
+
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ self.client.send(req).await
+ }
}
#[derive(Deserialize, Eq, PartialEq, Debug)]
diff --git a/core/src/services/lakefs/docs.md b/core/src/services/lakefs/docs.md
index a4589bccf1..892bf66b17 100644
--- a/core/src/services/lakefs/docs.md
+++ b/core/src/services/lakefs/docs.md
@@ -9,7 +9,7 @@ This service can be used to:
- [x] stat
- [x] read
-- [ ] write
+- [x] write
- [ ] create_dir
- [ ] delete
- [ ] copy
diff --git a/core/src/services/lakefs/mod.rs b/core/src/services/lakefs/mod.rs
index 2436a6ef6e..6762ebbfda 100644
--- a/core/src/services/lakefs/mod.rs
+++ b/core/src/services/lakefs/mod.rs
@@ -23,10 +23,14 @@ mod error;
#[cfg(feature = "services-lakefs")]
mod lister;
+#[cfg(feature = "services-lakefs")]
+mod writer;
+
#[cfg(feature = "services-lakefs")]
mod backend;
#[cfg(feature = "services-lakefs")]
pub use backend::LakefsBuilder as Lakefs;
mod config;
+
pub use config::LakefsConfig;
diff --git a/core/src/services/lakefs/mod.rs
b/core/src/services/lakefs/writer.rs
similarity index 51%
copy from core/src/services/lakefs/mod.rs
copy to core/src/services/lakefs/writer.rs
index 2436a6ef6e..299cf8bd42 100644
--- a/core/src/services/lakefs/mod.rs
+++ b/core/src/services/lakefs/writer.rs
@@ -15,18 +15,37 @@
// specific language governing permissions and limitations
// under the License.
-#[cfg(feature = "services-lakefs")]
-mod core;
-#[cfg(feature = "services-lakefs")]
-mod error;
+use std::sync::Arc;
-#[cfg(feature = "services-lakefs")]
-mod lister;
+use http::StatusCode;
-#[cfg(feature = "services-lakefs")]
-mod backend;
-#[cfg(feature = "services-lakefs")]
-pub use backend::LakefsBuilder as Lakefs;
+use crate::raw::*;
+use crate::services::lakefs::core::LakefsCore;
+use crate::*;
-mod config;
-pub use config::LakefsConfig;
+use super::error::parse_error;
+
+pub struct LakefsWriter {
+ core: Arc<LakefsCore>,
+ op: OpWrite,
+ path: String,
+}
+
+impl LakefsWriter {
+ pub fn new(core: Arc<LakefsCore>, path: String, op: OpWrite) -> Self {
+ LakefsWriter { core, path, op }
+ }
+}
+
+impl oio::OneShotWrite for LakefsWriter {
+ async fn write_once(&self, bs: Buffer) -> Result<()> {
+ let resp = self.core.upload_object(&self.path, &self.op, bs).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::OK => Ok(()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}