This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ae8ea438b feat: add write api for lakefs service. (#5100)
8ae8ea438b is described below

commit 8ae8ea438b8e49fd3660b3b0a326ebad49c75a20
Author: Guangdong Liu <[email protected]>
AuthorDate: Mon Sep 9 10:49:53 2024 +0800

    feat: add write api for lakefs service. (#5100)
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
    
    * 1
---
 core/src/services/lakefs/backend.rs            | 11 ++++++-
 core/src/services/lakefs/core.rs               | 28 +++++++++++++++++
 core/src/services/lakefs/docs.md               |  2 +-
 core/src/services/lakefs/mod.rs                |  4 +++
 core/src/services/lakefs/{mod.rs => writer.rs} | 43 +++++++++++++++++++-------
 5 files changed, 74 insertions(+), 14 deletions(-)

diff --git a/core/src/services/lakefs/backend.rs 
b/core/src/services/lakefs/backend.rs
index c5c2816e8e..69e59e395f 100644
--- a/core/src/services/lakefs/backend.rs
+++ b/core/src/services/lakefs/backend.rs
@@ -29,6 +29,7 @@ use super::core::LakefsCore;
 use super::core::LakefsStatus;
 use super::error::parse_error;
 use super::lister::LakefsLister;
+use super::writer::LakefsWriter;
 use crate::raw::*;
 use crate::services::LakefsConfig;
 use crate::*;
@@ -193,7 +194,7 @@ pub struct LakefsBackend {
 
 impl Access for LakefsBackend {
     type Reader = HttpBody;
-    type Writer = ();
+    type Writer = oio::OneShotWriter<LakefsWriter>;
     type Lister = oio::PageLister<LakefsLister>;
     type BlockingReader = ();
     type BlockingWriter = ();
@@ -206,6 +207,7 @@ impl Access for LakefsBackend {
                 stat: true,
                 list: true,
                 read: true,
+                write: true,
 
                 ..Default::default()
             });
@@ -276,4 +278,11 @@ impl Access for LakefsBackend {
 
         Ok((RpList::default(), oio::PageLister::new(l)))
     }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        Ok((
+            RpWrite::default(),
+            oio::OneShotWriter::new(LakefsWriter::new(self.core.clone(), 
path.to_string(), args)),
+        ))
+    }
 }
diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs
index 84cb1298cf..91aed0e349 100644
--- a/core/src/services/lakefs/core.rs
+++ b/core/src/services/lakefs/core.rs
@@ -143,6 +143,34 @@ impl LakefsCore {
 
         self.client.send(req).await
     }
+
+    pub async fn upload_object(
+        &self,
+        path: &str,
+        _args: &OpWrite,
+        body: Buffer,
+    ) -> Result<Response<Buffer>> {
+        let p = build_abs_path(&self.root, path)
+            .trim_end_matches('/')
+            .to_string();
+
+        let url = format!(
+            "{}/api/v1/repositories/{}/branches/{}/objects?path={}",
+            self.endpoint,
+            self.repository,
+            self.branch,
+            percent_encode_path(&p)
+        );
+
+        let mut req = Request::post(&url);
+
+        let auth_header_content = 
format_authorization_by_basic(&self.username, &self.password)?;
+        req = req.header(header::AUTHORIZATION, auth_header_content);
+
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        self.client.send(req).await
+    }
 }
 
 #[derive(Deserialize, Eq, PartialEq, Debug)]
diff --git a/core/src/services/lakefs/docs.md b/core/src/services/lakefs/docs.md
index a4589bccf1..892bf66b17 100644
--- a/core/src/services/lakefs/docs.md
+++ b/core/src/services/lakefs/docs.md
@@ -9,7 +9,7 @@ This service can be used to:
 
 - [x] stat
 - [x] read
-- [ ] write
+- [x] write
 - [ ] create_dir
 - [ ] delete
 - [ ] copy
diff --git a/core/src/services/lakefs/mod.rs b/core/src/services/lakefs/mod.rs
index 2436a6ef6e..6762ebbfda 100644
--- a/core/src/services/lakefs/mod.rs
+++ b/core/src/services/lakefs/mod.rs
@@ -23,10 +23,14 @@ mod error;
 #[cfg(feature = "services-lakefs")]
 mod lister;
 
+#[cfg(feature = "services-lakefs")]
+mod writer;
+
 #[cfg(feature = "services-lakefs")]
 mod backend;
 #[cfg(feature = "services-lakefs")]
 pub use backend::LakefsBuilder as Lakefs;
 
 mod config;
+
 pub use config::LakefsConfig;
diff --git a/core/src/services/lakefs/mod.rs 
b/core/src/services/lakefs/writer.rs
similarity index 51%
copy from core/src/services/lakefs/mod.rs
copy to core/src/services/lakefs/writer.rs
index 2436a6ef6e..299cf8bd42 100644
--- a/core/src/services/lakefs/mod.rs
+++ b/core/src/services/lakefs/writer.rs
@@ -15,18 +15,37 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[cfg(feature = "services-lakefs")]
-mod core;
-#[cfg(feature = "services-lakefs")]
-mod error;
+use std::sync::Arc;
 
-#[cfg(feature = "services-lakefs")]
-mod lister;
+use http::StatusCode;
 
-#[cfg(feature = "services-lakefs")]
-mod backend;
-#[cfg(feature = "services-lakefs")]
-pub use backend::LakefsBuilder as Lakefs;
+use crate::raw::*;
+use crate::services::lakefs::core::LakefsCore;
+use crate::*;
 
-mod config;
-pub use config::LakefsConfig;
+use super::error::parse_error;
+
+pub struct LakefsWriter {
+    core: Arc<LakefsCore>,
+    op: OpWrite,
+    path: String,
+}
+
+impl LakefsWriter {
+    pub fn new(core: Arc<LakefsCore>, path: String, op: OpWrite) -> Self {
+        LakefsWriter { core, path, op }
+    }
+}
+
+impl oio::OneShotWrite for LakefsWriter {
+    async fn write_once(&self, bs: Buffer) -> Result<()> {
+        let resp = self.core.upload_object(&self.path, &self.op, bs).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => Ok(()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}

Reply via email to