This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new b31a58438 feat: oss multipart uploads write (#2723)
b31a58438 is described below

commit b31a58438c591af63bcec3cf7d67e1ef30f8ac1e
Author: parkma99 <[email protected]>
AuthorDate: Thu Jul 27 23:09:09 2023 +0800

    feat: oss multipart uploads write (#2723)
---
 core/src/services/oss/backend.rs |   3 +-
 core/src/services/oss/core.rs    |  58 ++++++------
 core/src/services/oss/writer.rs  | 184 ++++++++++++++++-----------------------
 3 files changed, 108 insertions(+), 137 deletions(-)

diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs
index 370c7d2e7..14cc08c2f 100644
--- a/core/src/services/oss/backend.rs
+++ b/core/src/services/oss/backend.rs
@@ -386,7 +386,7 @@ pub struct OssBackend {
 impl Accessor for OssBackend {
     type Reader = IncomingAsyncBody;
     type BlockingReader = ();
-    type Writer = OssWriter;
+    type Writer = oio::MultipartUploadWriter<OssWriter>;
     type BlockingWriter = ();
     type Appender = OssAppender;
     type Pager = OssPager;
@@ -412,6 +412,7 @@ impl Accessor for OssBackend {
                 write_can_sink: true,
                 write_with_cache_control: true,
                 write_with_content_type: true,
+                write_with_content_disposition: true,
                 write_without_content_length: true,
                 delete: true,
                 create_dir: true,
diff --git a/core/src/services/oss/core.rs b/core/src/services/oss/core.rs
index da2628a3a..11279f9e1 100644
--- a/core/src/services/oss/core.rs
+++ b/core/src/services/oss/core.rs
@@ -487,27 +487,13 @@ impl OssCore {
     }
 
     pub async fn oss_initiate_upload(
-        &self,
-        path: &str,
-        args: &OpWrite,
-    ) -> Result<Response<IncomingAsyncBody>> {
-        let cache_control = args.cache_control();
-        let req = self
-            .oss_initiate_upload_request(path, None, None, cache_control, 
AsyncBody::Empty, false)
-            .await?;
-        self.send(req).await
-    }
-
-    /// Creates a request that initiates multipart upload
-    async fn oss_initiate_upload_request(
         &self,
         path: &str,
         content_type: Option<&str>,
         content_disposition: Option<&str>,
         cache_control: Option<&str>,
-        body: AsyncBody,
         is_presign: bool,
-    ) -> Result<Request<AsyncBody>> {
+    ) -> Result<Response<IncomingAsyncBody>> {
         let path = build_abs_path(&self.root, path);
         let endpoint = self.get_endpoint(is_presign);
         let url = format!("{}/{}?uploads", endpoint, 
percent_encode_path(&path));
@@ -522,9 +508,11 @@ impl OssCore {
             req = req.header(CACHE_CONTROL, cache_control);
         }
         req = self.insert_sse_headers(req);
-        let mut req = req.body(body).map_err(new_request_build_error)?;
+        let mut req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
         self.sign(&mut req).await?;
-        Ok(req)
+        self.send(req).await
     }
 
     /// Creates a request to upload a part
@@ -534,9 +522,9 @@ impl OssCore {
         upload_id: &str,
         part_number: usize,
         is_presign: bool,
-        size: Option<u64>,
+        size: u64,
         body: AsyncBody,
-    ) -> Result<Request<AsyncBody>> {
+    ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let endpoint = self.get_endpoint(is_presign);
 
@@ -549,13 +537,10 @@ impl OssCore {
         );
 
         let mut req = Request::put(&url);
-
-        if let Some(size) = size {
-            req = req.header(CONTENT_LENGTH, size);
-        }
+        req = req.header(CONTENT_LENGTH, size);
         let mut req = req.body(body).map_err(new_request_build_error)?;
         self.sign(&mut req).await?;
-        Ok(req)
+        self.send(req).await
     }
 
     pub async fn oss_complete_multipart_upload_request(
@@ -563,7 +548,7 @@ impl OssCore {
         path: &str,
         upload_id: &str,
         is_presign: bool,
-        parts: &[MultipartUploadPart],
+        parts: Vec<MultipartUploadPart>,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_abs_path(&self.root, path);
         let endpoint = self.get_endpoint(is_presign);
@@ -592,6 +577,29 @@ impl OssCore {
         self.sign(&mut req).await?;
         self.send(req).await
     }
+
+    /// Abort an on-going multipart upload.
+    /// reference docs 
https://www.alibabacloud.com/help/zh/oss/developer-reference/abortmultipartupload
+    pub async fn oss_abort_multipart_upload(
+        &self,
+        path: &str,
+        upload_id: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let url = format!(
+            "{}/{}?uploadId={}",
+            self.endpoint,
+            percent_encode_path(&p),
+            percent_encode_path(upload_id)
+        );
+
+        let mut req = Request::delete(&url)
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+        self.sign(&mut req).await?;
+        self.send(req).await
+    }
 }
 
 /// Request of DeleteObjects.
diff --git a/core/src/services/oss/writer.rs b/core/src/services/oss/writer.rs
index d81fbebf1..f2f737d65 100644
--- a/core/src/services/oss/writer.rs
+++ b/core/src/services/oss/writer.rs
@@ -19,7 +19,6 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use bytes::Buf;
-use bytes::Bytes;
 use http::StatusCode;
 
 use super::core::*;
@@ -32,29 +31,28 @@ pub struct OssWriter {
 
     op: OpWrite,
     path: String,
-    upload_id: Option<String>,
-
-    parts: Vec<MultipartUploadPart>,
-    buffer: oio::VectorCursor,
-    buffer_size: usize,
 }
 
 impl OssWriter {
-    pub fn new(core: Arc<OssCore>, path: &str, op: OpWrite) -> Self {
-        let buffer_size = core.write_min_size;
-        OssWriter {
+    pub fn new(
+        core: Arc<OssCore>,
+        path: &str,
+        op: OpWrite,
+    ) -> oio::MultipartUploadWriter<OssWriter> {
+        let write_min_size = core.write_min_size;
+        let total_size = op.content_length();
+        let oss_writer = OssWriter {
             core,
             path: path.to_string(),
             op,
-
-            upload_id: None,
-            parts: vec![],
-            buffer: oio::VectorCursor::new(),
-            buffer_size,
-        }
+        };
+        oio::MultipartUploadWriter::new(oss_writer, 
total_size).with_write_min_size(write_min_size)
     }
+}
 
-    async fn write_oneshot(&self, size: u64, body: AsyncBody) -> Result<()> {
+#[async_trait]
+impl oio::MultipartUploadWrite for OssWriter {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
         let mut req = self.core.oss_put_object_request(
             &self.path,
             Some(size),
@@ -80,38 +78,51 @@ impl OssWriter {
         }
     }
 
-    async fn initiate_upload(&self) -> Result<String> {
-        let resp = self.core.oss_initiate_upload(&self.path, &self.op).await?;
-        match resp.status() {
+    async fn initiate_part(&self) -> Result<String> {
+        let resp = self
+            .core
+            .oss_initiate_upload(
+                &self.path,
+                self.op.content_type(),
+                self.op.content_disposition(),
+                self.op.cache_control(),
+                false,
+            )
+            .await?;
+
+        let status = resp.status();
+
+        match status {
             StatusCode::OK => {
                 let bs = resp.into_body().bytes().await?;
+
                 let result: InitiateMultipartUploadResult =
                     
quick_xml::de::from_reader(bs.reader()).map_err(new_xml_deserialize_error)?;
+
                 Ok(result.upload_id)
             }
             _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn write_part(&self, upload_id: &str, bs: Bytes) -> 
Result<MultipartUploadPart> {
-        // Aliyun OSS requires part number must between [1..=10000]
-        let part_number = self.parts.len() + 1;
-        let mut req = self
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartUploadPart> {
+        // OSS requires part number must between [1..=10000]
+        let part_number = part_number + 1;
+
+        let resp = self
             .core
-            .oss_upload_part_request(
-                &self.path,
-                upload_id,
-                part_number,
-                false,
-                Some(bs.len() as u64),
-                AsyncBody::Bytes(bs),
-            )
+            .oss_upload_part_request(&self.path, upload_id, part_number, 
false, size, body)
             .await?;
 
-        self.core.sign(&mut req).await?;
+        let status = resp.status();
 
-        let resp = self.core.send(req).await?;
-        match resp.status() {
+        match status {
             StatusCode::OK => {
                 let etag = parse_etag(resp.headers())?
                     .ok_or_else(|| {
@@ -121,103 +132,54 @@ impl OssWriter {
                         )
                     })?
                     .to_string();
+
                 resp.into_body().consume().await?;
-                Ok(MultipartUploadPart { part_number, etag })
+
+                Ok(oio::MultipartUploadPart { part_number, etag })
             }
             _ => Err(parse_error(resp).await?),
         }
     }
-}
 
-#[async_trait]
-impl oio::Write for OssWriter {
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let upload_id = match &self.upload_id {
-            Some(upload_id) => upload_id,
-            None => {
-                if self.op.content_length().unwrap_or_default() == bs.len() as 
u64 {
-                    return self
-                        .write_oneshot(bs.len() as u64, AsyncBody::Bytes(bs))
-                        .await;
-                } else {
-                    let upload_id = self.initiate_upload().await?;
-                    self.upload_id = Some(upload_id);
-                    self.upload_id.as_deref().unwrap()
-                }
-            }
-        };
+    async fn complete_part(
+        &self,
+        upload_id: &str,
+        parts: &[oio::MultipartUploadPart],
+    ) -> Result<()> {
+        let parts = parts
+            .iter()
+            .map(|p| MultipartUploadPart {
+                part_number: p.part_number,
+                etag: p.etag.clone(),
+            })
+            .collect();
 
-        // Ignore empty bytes
-        if bs.is_empty() {
-            return Ok(());
-        }
+        let resp = self
+            .core
+            .oss_complete_multipart_upload_request(&self.path, upload_id, 
false, parts)
+            .await?;
 
-        self.buffer.push(bs);
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.buffer_size {
-            return Ok(());
-        }
+        let status = resp.status();
 
-        let bs = self.buffer.peak_at_least(self.buffer_size);
-        let size = bs.len();
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
 
-        match self.write_part(upload_id, bs).await {
-            Ok(part) => {
-                self.buffer.take(size);
-                self.parts.push(part);
                 Ok(())
             }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
+            _ => Err(parse_error(resp).await?),
         }
     }
 
-    async fn sink(&mut self, size: u64, s: oio::Streamer) -> Result<()> {
-        self.write_oneshot(size, AsyncBody::Stream(s)).await
-    }
-
-    // TODO: we can cancel the upload by sending an abort request.
-    async fn abort(&mut self) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support abort",
-        ))
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        let upload_id = if let Some(upload_id) = &self.upload_id {
-            upload_id
-        } else {
-            return Ok(());
-        };
-
-        // Make sure internal buffer has been flushed.
-        if !self.buffer.is_empty() {
-            let bs = self.buffer.peak_exact(self.buffer.len());
-
-            match self.write_part(upload_id, bs).await {
-                Ok(part) => {
-                    self.buffer.clear();
-                    self.parts.push(part);
-                }
-                Err(e) => {
-                    return Err(e);
-                }
-            }
-        }
-
+    async fn abort_part(&self, upload_id: &str) -> Result<()> {
         let resp = self
             .core
-            .oss_complete_multipart_upload_request(&self.path, upload_id, 
false, &self.parts)
+            .oss_abort_multipart_upload(&self.path, upload_id)
             .await?;
         match resp.status() {
-            StatusCode::OK => {
+            // OSS returns code 204 if abort succeeds.
+            StatusCode::NO_CONTENT => {
                 resp.into_body().consume().await?;
-
                 Ok(())
             }
             _ => Err(parse_error(resp).await?),

Reply via email to