This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c058ee051 feat(core/raw): Add stream support for multipart (#2923)
c058ee051 is described below

commit c058ee05179eafaa3acd7466a62cec779b50560a
Author: Xuanwo <[email protected]>
AuthorDate: Thu Aug 24 15:36:25 2023 +0800

    feat(core/raw): Add stream support for multipart (#2923)
---
 core/benches/oio/write.rs           |   4 +-
 core/src/raw/http_util/multipart.rs | 586 ++++++++++++++++++++++++------------
 core/src/raw/oio/cursor.rs          |   6 +
 3 files changed, 401 insertions(+), 195 deletions(-)

diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 5c15ef53f..6e26ce7e0 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -17,7 +17,9 @@
 
 use criterion::Criterion;
 use once_cell::sync::Lazy;
-use opendal::raw::oio::{AtLeastBufWriter, ExactBufWriter, Write};
+use opendal::raw::oio::AtLeastBufWriter;
+use opendal::raw::oio::ExactBufWriter;
+use opendal::raw::oio::Write;
 use rand::thread_rng;
 use size::Size;
 
diff --git a/core/src/raw/http_util/multipart.rs 
b/core/src/raw/http_util/multipart.rs
index 1b64de3d0..d890ceed2 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -15,9 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::VecDeque;
 use std::mem;
 use std::str::FromStr;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
 
+use async_trait::async_trait;
 use bytes::Bytes;
 use bytes::BytesMut;
 use futures::stream;
@@ -39,6 +44,8 @@ use super::new_request_build_error;
 use super::AsyncBody;
 use super::IncomingAsyncBody;
 use crate::raw::oio;
+use crate::raw::oio::Stream;
+use crate::raw::oio::Streamer;
 use crate::*;
 
 /// Multipart is a builder for multipart/form-data.
@@ -105,67 +112,130 @@ impl<T: Part> Multipart<T> {
         Ok(self)
     }
 
-    pub(crate) fn build(&self) -> Bytes {
+    pub(crate) fn build(self) -> (u64, MultipartStream<T>) {
+        let mut total_size = 0;
+
         let mut bs = BytesMut::new();
+        bs.extend_from_slice(b"--");
+        bs.extend_from_slice(self.boundary.as_bytes());
+        bs.extend_from_slice(b"\r\n");
 
-        // Write headers.
-        for v in self.parts.iter() {
-            // Write the first boundary
-            bs.extend_from_slice(b"--");
-            bs.extend_from_slice(self.boundary.as_bytes());
-            bs.extend_from_slice(b"\r\n");
+        let pre_part = bs.freeze();
 
-            bs.extend_from_slice(v.format().as_ref());
+        let mut parts = VecDeque::new();
+        // Write headers.
+        for v in self.parts.into_iter() {
+            let (size, stream) = v.format();
+            total_size += pre_part.len() as u64 + size;
+            parts.push_back(stream);
         }
 
         // Write the last boundary
+        let mut bs = BytesMut::new();
         bs.extend_from_slice(b"--");
         bs.extend_from_slice(self.boundary.as_bytes());
         bs.extend_from_slice(b"--");
         bs.extend_from_slice(b"\r\n");
-
-        bs.freeze()
+        let final_part = bs.freeze();
+
+        total_size += final_part.len() as u64;
+
+        (
+            total_size,
+            MultipartStream {
+                pre_part,
+                pre_part_consumed: false,
+                parts,
+                final_part: Some(final_part),
+            },
+        )
     }
 
     /// Consume the input and generate a request with multipart body.
     ///
-    /// This founction will make sure content_type and content_length set 
correctly.
+    /// This function will make sure content_type and content_length set 
correctly.
     pub fn apply(self, mut builder: http::request::Builder) -> 
Result<Request<AsyncBody>> {
-        let bs = self.build();
+        let boundary = self.boundary.clone();
+        let (content_length, stream) = self.build();
 
         // Insert content type with correct boundary.
         builder = builder.header(
             CONTENT_TYPE,
-            format!("multipart/{}; boundary={}", T::TYPE, 
self.boundary).as_str(),
+            format!("multipart/{}; boundary={}", T::TYPE, boundary).as_str(),
         );
         // Insert content length with calculated size.
-        builder = builder.header(CONTENT_LENGTH, bs.len());
+        builder = builder.header(CONTENT_LENGTH, content_length);
 
         builder
-            .body(AsyncBody::Bytes(bs))
+            .body(AsyncBody::Stream(Box::new(stream)))
             .map_err(new_request_build_error)
     }
 }
 
+pub struct MultipartStream<T: Part> {
+    pre_part: Bytes,
+    pre_part_consumed: bool,
+
+    parts: VecDeque<T::STREAM>,
+
+    final_part: Option<Bytes>,
+}
+
+impl<T: Part> Stream for MultipartStream<T> {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if let Some(stream) = self.parts.front_mut() {
+            if !self.pre_part_consumed {
+                self.pre_part_consumed = true;
+                return Poll::Ready(Some(Ok(self.pre_part.clone())));
+            }
+            return match ready!(stream.poll_next(cx)) {
+                None => {
+                    self.pre_part_consumed = false;
+                    self.parts.pop_front();
+                    return self.poll_next(cx);
+                }
+                Some(v) => Poll::Ready(Some(v)),
+            };
+        }
+
+        if let Some(final_part) = self.final_part.take() {
+            return Poll::Ready(Some(Ok(final_part)));
+        }
+
+        Poll::Ready(None)
+    }
+
+    /// It's possible to implement reset by calling stream's `poll_reset`.
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "MultipartStream doesn't support reset yet",
+        )))
+    }
+}
+
 /// Part is a trait for multipart part.
-pub trait Part: Sized {
+pub trait Part: Sized + 'static {
     /// TYPE is the type of multipart.
     ///
     /// Current available types are: `form-data` and `mixed`
     const TYPE: &'static str;
+    /// STREAM is the stream representation of this part which can be used in 
multipart body.
+    type STREAM: Stream;
 
     /// format will generates the bytes.
-    fn format(&self) -> Bytes;
+    fn format(self) -> (u64, Self::STREAM);
 
     /// parse will parse the bytes into a part.
     fn parse(s: &str) -> Result<Self>;
 }
 
 /// FormDataPart is a builder for multipart/form-data part.
-#[derive(Debug)]
 pub struct FormDataPart {
     headers: HeaderMap,
-    content: Bytes,
+
+    content_length: u64,
+    content: Streamer,
 }
 
 impl FormDataPart {
@@ -184,7 +254,8 @@ impl FormDataPart {
 
         Self {
             headers,
-            content: Bytes::new(),
+            content_length: 0,
+            content: Box::new(oio::Cursor::new()),
         }
     }
 
@@ -196,31 +267,48 @@ impl FormDataPart {
 
     /// Set the content for this part.
     pub fn content(mut self, content: impl Into<Bytes>) -> Self {
-        self.content = content.into();
+        let content = content.into();
+
+        self.content_length = content.len() as u64;
+        self.content = Box::new(oio::Cursor::from(content));
+        self
+    }
+
+    /// Set the stream content for this part.
+    pub fn stream(mut self, size: u64, content: Streamer) -> Self {
+        self.content_length = size;
+        self.content = content;
         self
     }
 }
 
 impl Part for FormDataPart {
     const TYPE: &'static str = "form-data";
+    type STREAM = FormDataPartStream;
 
-    fn format(&self) -> Bytes {
+    fn format(self) -> (u64, FormDataPartStream) {
         let mut bs = BytesMut::new();
 
-        // Write headers.
+        // Building pre-content.
         for (k, v) in self.headers.iter() {
             bs.extend_from_slice(k.as_str().as_bytes());
             bs.extend_from_slice(b": ");
             bs.extend_from_slice(v.as_bytes());
             bs.extend_from_slice(b"\r\n");
         }
-
-        // Write content.
         bs.extend_from_slice(b"\r\n");
-        bs.extend_from_slice(&self.content);
-        bs.extend_from_slice(b"\r\n");
-
-        bs.freeze()
+        let bs = bs.freeze();
+
+        // pre-content + content + post-content (b`\r\n`)
+        let total_size = bs.len() as u64 + self.content_length + 2;
+
+        (
+            total_size,
+            FormDataPartStream {
+                pre_content: Some(bs),
+                content: Some(self.content),
+            },
+        )
     }
 
     fn parse(_: &str) -> Result<Self> {
@@ -231,16 +319,50 @@ impl Part for FormDataPart {
     }
 }
 
+pub struct FormDataPartStream {
+    /// Including headers and the first `b\r\n`
+    pre_content: Option<Bytes>,
+    content: Option<Streamer>,
+}
+
+#[async_trait]
+impl Stream for FormDataPartStream {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if let Some(pre_content) = self.pre_content.take() {
+            return Poll::Ready(Some(Ok(pre_content)));
+        }
+
+        if let Some(stream) = self.content.as_mut() {
+            return match ready!(stream.poll_next(cx)) {
+                None => {
+                    self.content = None;
+                    Poll::Ready(Some(Ok(Bytes::from_static(b"\r\n"))))
+                }
+                Some(v) => Poll::Ready(Some(v)),
+            };
+        }
+
+        Poll::Ready(None)
+    }
+
+    /// It's possible to implement reset by calling stream's `poll_reset`.
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "FormDataPartStream doesn't support reset yet",
+        )))
+    }
+}
+
 /// MixedPart is a builder for multipart/mixed part.
-#[derive(Debug)]
-#[cfg_attr(test, derive(Eq, PartialEq))]
 pub struct MixedPart {
     part_headers: HeaderMap,
 
     /// Common
     version: Version,
     headers: HeaderMap,
-    content: Bytes,
+    content_length: u64,
+    content: Option<Streamer>,
 
     /// Request only
     method: Option<Method>,
@@ -264,7 +386,8 @@ impl MixedPart {
 
             version: Version::HTTP_11,
             headers: HeaderMap::new(),
-            content: Bytes::new(),
+            content_length: 0,
+            content: None,
 
             uri: Some(uri),
             method: None,
@@ -281,9 +404,12 @@ impl MixedPart {
 
         let (parts, body) = req.into_parts();
 
-        let content = match body {
-            AsyncBody::Empty => Bytes::new(),
-            AsyncBody::Bytes(bs) => bs,
+        let (content_length, content) = match body {
+            AsyncBody::Empty => (0, None),
+            AsyncBody::Bytes(bs) => (
+                bs.len() as u64,
+                Some(Box::new(oio::Cursor::from(bs)) as Streamer),
+            ),
             AsyncBody::Stream(_) => panic!("multipart request can't contain 
stream body"),
         };
 
@@ -301,6 +427,7 @@ impl MixedPart {
             ),
             version: parts.version,
             headers: parts.headers,
+            content_length,
             content,
 
             method: Some(parts.method),
@@ -317,12 +444,11 @@ impl MixedPart {
         // Swap headers directly instead of copy the entire map.
         mem::swap(builder.headers_mut().unwrap(), &mut self.headers);
 
-        let bs: Bytes = self.content;
-        let length = bs.len();
-        let body = IncomingAsyncBody::new(
-            Box::new(oio::into_stream(stream::iter(vec![Ok(bs)]))),
-            Some(length as u64),
-        );
+        let body = if let Some(stream) = self.content {
+            IncomingAsyncBody::new(stream, Some(self.content_length))
+        } else {
+            
IncomingAsyncBody::new(Box::new(oio::into_stream(stream::empty())), Some(0))
+        };
 
         builder
             .body(body)
@@ -355,15 +481,26 @@ impl MixedPart {
 
     /// Set the content for this part.
     pub fn content(mut self, content: impl Into<Bytes>) -> Self {
-        self.content = content.into();
+        let content = content.into();
+
+        self.content_length = content.len() as u64;
+        self.content = Some(Box::new(oio::Cursor::from(content)));
+        self
+    }
+
+    /// Set the stream content for this part.
+    pub fn stream(mut self, size: u64, content: Streamer) -> Self {
+        self.content_length = size;
+        self.content = Some(content);
         self
     }
 }
 
 impl Part for MixedPart {
     const TYPE: &'static str = "mixed";
+    type STREAM = MixedPartStream;
 
-    fn format(&self) -> Bytes {
+    fn format(self) -> (u64, Self::STREAM) {
         let mut bs = BytesMut::new();
 
         // Write parts headers.
@@ -420,15 +557,24 @@ impl Part for MixedPart {
             bs.extend_from_slice(v.as_bytes());
             bs.extend_from_slice(b"\r\n");
         }
-
-        // Write content.
         bs.extend_from_slice(b"\r\n");
-        if !self.content.is_empty() {
-            bs.extend_from_slice(&self.content);
-            bs.extend_from_slice(b"\r\n");
+
+        let bs = bs.freeze();
+
+        // pre-content + content + post-content;
+        let mut total_size = bs.len() as u64;
+
+        if self.content.is_some() {
+            total_size += self.content_length + 2;
         }
 
-        bs.freeze()
+        (
+            total_size,
+            MixedPartStream {
+                pre_content: Some(bs),
+                content: self.content,
+            },
+        )
     }
 
     /// TODO
@@ -465,6 +611,7 @@ impl Part for MixedPart {
         let parts = http_response.split("\r\n\r\n").collect::<Vec<&str>>();
         let headers_content = parts[0];
         let body_content = parts.get(1).unwrap_or(&"");
+        let body_bytes = Bytes::from(body_content.to_string());
 
         let status_line = headers_content.lines().next().unwrap_or("");
         let status_code = status_line
@@ -501,7 +648,8 @@ impl Part for MixedPart {
             part_headers,
             version: Version::HTTP_11,
             headers,
-            content: Bytes::from(body_content.to_string()),
+            content_length: body_bytes.len() as u64,
+            content: Some(Box::new(oio::Cursor::from(body_bytes))),
 
             method: None,
             uri: None,
@@ -517,21 +665,58 @@ impl Part for MixedPart {
     }
 }
 
+pub struct MixedPartStream {
+    /// Including headers and the first `b\r\n`
+    pre_content: Option<Bytes>,
+    content: Option<Streamer>,
+}
+
+impl Stream for MixedPartStream {
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        if let Some(pre_content) = self.pre_content.take() {
+            return Poll::Ready(Some(Ok(pre_content)));
+        }
+
+        if let Some(stream) = self.content.as_mut() {
+            return match ready!(stream.poll_next(cx)) {
+                None => {
+                    self.content = None;
+                    Poll::Ready(Some(Ok(Bytes::from_static(b"\r\n"))))
+                }
+                Some(v) => Poll::Ready(Some(v)),
+            };
+        }
+
+        Poll::Ready(None)
+    }
+
+    /// It's possible to implement reset by calling stream's `poll_reset`.
+    fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "MixedPartStream doesn't support reset yet",
+        )))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use http::header::CONTENT_TYPE;
     use pretty_assertions::assert_eq;
 
     use super::*;
+    use crate::raw::oio::StreamExt;
 
-    #[test]
-    fn test_multipart_formdata_basic() {
+    #[tokio::test]
+    async fn test_multipart_formdata_basic() -> Result<()> {
         let multipart = Multipart::new()
             .with_boundary("lalala")
             .part(FormDataPart::new("foo").content(Bytes::from("bar")))
             .part(FormDataPart::new("hello").content(Bytes::from("world")));
 
-        let body = multipart.build();
+        let (size, body) = multipart.build();
+        let bs = body.collect().await.unwrap();
+        assert_eq!(size, bs.len() as u64);
 
         let expected = "--lalala\r\n\
              content-disposition: form-data; name=\"foo\"\r\n\
@@ -543,12 +728,13 @@ mod tests {
              world\r\n\
              --lalala--\r\n";
 
-        assert_eq!(expected, String::from_utf8(body.to_vec()).unwrap());
+        assert_eq!(Bytes::from(expected), bs);
+        Ok(())
     }
 
     /// This test is inspired by 
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/HTTPPOSTExamples.html>
-    #[test]
-    fn test_multipart_formdata_s3_form_upload() {
+    #[tokio::test]
+    async fn test_multipart_formdata_s3_form_upload() -> Result<()> {
         let multipart = Multipart::new()
             .with_boundary("9431149156168")
             .part(FormDataPart::new("key").content("user/eric/MyPicture.jpg"))
@@ -564,7 +750,9 @@ mod tests {
             
.part(FormDataPart::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU="))
             .part(FormDataPart::new("file").header(CONTENT_TYPE, 
"image/jpeg".parse().unwrap()).content("...file 
content...")).part(FormDataPart::new("submit").content("Upload to Amazon S3"));
 
-        let body = multipart.build();
+        let (size, body) = multipart.build();
+        let bs = body.collect().await?;
+        assert_eq!(size, bs.len() as u64);
 
         let expected = r#"--9431149156168
 content-disposition: form-data; name="key"
@@ -618,15 +806,17 @@ Upload to Amazon S3
             expected,
             // Rust can't represent `\r` in a string literal, so we
             // replace `\r\n` with `\n` for comparison
-            String::from_utf8(body.to_vec())
+            String::from_utf8(bs.to_vec())
                 .unwrap()
                 .replace("\r\n", "\n")
         );
+
+        Ok(())
     }
 
     /// This test is inspired by <https://cloud.google.com/storage/docs/batch>
-    #[test]
-    fn test_multipart_mixed_gcs_batch_metadata() {
+    #[tokio::test]
+    async fn test_multipart_mixed_gcs_batch_metadata() -> Result<()> {
         let multipart = Multipart::new()
             .with_boundary("===============7330845974216740156==")
             .part(
@@ -684,7 +874,9 @@ Upload to Amazon S3
                     .content(r#"{"metadata": {"type": "calico"}}"#),
             );
 
-        let body = multipart.build();
+        let (size, body) = multipart.build();
+        let bs = body.collect().await?;
+        assert_eq!(size, bs.len() as u64);
 
         let expected = r#"--===============7330845974216740156==
 Content-Type: application/http
@@ -726,15 +918,17 @@ content-length: 32
             expected,
             // Rust can't represent `\r` in a string literal, so we
             // replace `\r\n` with `\n` for comparison
-            String::from_utf8(body.to_vec())
+            String::from_utf8(bs.to_vec())
                 .unwrap()
                 .replace("\r\n", "\n")
         );
+
+        Ok(())
     }
 
     /// This test is inspired by 
<https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=azure-ad>
-    #[test]
-    fn test_multipart_mixed_azblob_batch_delete() {
+    #[tokio::test]
+    async fn test_multipart_mixed_azblob_batch_delete() -> Result<()> {
         let multipart = Multipart::new()
             .with_boundary("batch_357de4f7-6d0b-4e02-8cd2-6361411a9525")
             .part(
@@ -786,7 +980,9 @@ content-length: 32
                     .header("content-length".parse().unwrap(), 
"0".parse().unwrap()),
             );
 
-        let body = multipart.build();
+        let (size, body) = multipart.build();
+        let bs = body.collect().await?;
+        assert_eq!(size, bs.len() as u64);
 
         let expected = r#"--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525
 Content-Type: application/http
@@ -825,10 +1021,12 @@ content-length: 0
             expected,
             // Rust can't represent `\r` in a string literal, so we
             // replace `\r\n` with `\n` for comparison
-            String::from_utf8(body.to_vec())
+            String::from_utf8(bs.to_vec())
                 .unwrap()
                 .replace("\r\n", "\n")
         );
+
+        Ok(())
     }
 
     /// This test is inspired by <https://cloud.google.com/storage/docs/batch>
@@ -883,137 +1081,137 @@ Content-Length: 846
             .parse(Bytes::from(response))
             .unwrap();
 
+        let part0_bs = Bytes::from_static(
+            r#"{"kind": "storage#object","id": 
"example-bucket/obj1/1495822576643790","metadata": {"type": 
"tabby"}}"#.as_bytes());
+        let part1_bs = Bytes::from_static(
+            r#"{"kind": "storage#object","id": 
"example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"#
+                .as_bytes()
+        );
+        let part2_bs = Bytes::from_static(
+            r#"{"kind": "storage#object","id": 
"example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"#
+                .as_bytes()
+        );
+
         assert_eq!(multipart.parts.len(), 3);
+
+        assert_eq!(multipart.parts[0].part_headers, {
+            let mut h = HeaderMap::new();
+            h.insert("Content-Type", "application/http".parse().unwrap());
+            h.insert(
+                "Content-ID",
+                "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>"
+                    .parse()
+                    .unwrap(),
+            );
+
+            h
+        });
+        assert_eq!(multipart.parts[0].version, Version::HTTP_11);
+        assert_eq!(multipart.parts[0].headers, {
+            let mut h = HeaderMap::new();
+            h.insert(
+                "ETag",
+                "\"lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y\""
+                    .parse()
+                    .unwrap(),
+            );
+            h.insert(
+                "Content-Type",
+                "application/json; charset=UTF-8".parse().unwrap(),
+            );
+            h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap());
+            h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+            h.insert("Cache-Control", "private, max-age=0".parse().unwrap());
+            h.insert("Content-Length", "846".parse().unwrap());
+
+            h
+        });
+        assert_eq!(multipart.parts[0].content_length, part0_bs.len() as u64);
+        assert_eq!(multipart.parts[0].uri, None);
+        assert_eq!(multipart.parts[0].method, None);
         assert_eq!(
-            multipart.parts[0],
-            MixedPart {
-                part_headers: {
-                    let mut h = HeaderMap::new();
-                    h.insert("Content-Type", 
"application/http".parse().unwrap());
-                    h.insert(
-                        "Content-ID",
-                        "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>"
-                            .parse()
-                            .unwrap(),
-                    );
-
-                    h
-                },
-                version: Version::HTTP_11,
-                headers: {
-                    let mut h = HeaderMap::new();
-                    h.insert(
-                        "ETag",
-                        
"\"lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y\""
-                            .parse()
-                            .unwrap(),
-                    );
-                    h.insert(
-                        "Content-Type",
-                        "application/json; charset=UTF-8".parse().unwrap(),
-                    );
-                    h.insert("Date", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
-                    h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
-                    h.insert("Cache-Control", "private, 
max-age=0".parse().unwrap());
-                    h.insert("Content-Length", "846".parse().unwrap());
-
-                    h
-                },
-                content: Bytes::from_static(
-                    r#"{"kind": "storage#object","id": 
"example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}"#
-                    .as_bytes()
-                ),
-                uri: None,
-                method: None,
-                status_code: Some(StatusCode::from_u16(200).unwrap())
-            }
+            multipart.parts[0].status_code,
+            Some(StatusCode::from_u16(200).unwrap())
         );
+
+        assert_eq!(multipart.parts[1].part_headers, {
+            let mut h = HeaderMap::new();
+            h.insert("Content-Type", "application/http".parse().unwrap());
+            h.insert(
+                "Content-ID",
+                "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>"
+                    .parse()
+                    .unwrap(),
+            );
+
+            h
+        });
+        assert_eq!(multipart.parts[1].version, Version::HTTP_11);
+        assert_eq!(multipart.parts[1].headers, {
+            let mut h = HeaderMap::new();
+            h.insert(
+                "ETag",
+                "\"lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk\""
+                    .parse()
+                    .unwrap(),
+            );
+            h.insert(
+                "Content-Type",
+                "application/json; charset=UTF-8".parse().unwrap(),
+            );
+            h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap());
+            h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+            h.insert("Cache-Control", "private, max-age=0".parse().unwrap());
+            h.insert("Content-Length", "846".parse().unwrap());
+
+            h
+        });
+        assert_eq!(multipart.parts[1].content_length, part1_bs.len() as u64);
+        assert_eq!(multipart.parts[1].uri, None);
+        assert_eq!(multipart.parts[1].method, None);
         assert_eq!(
-            multipart.parts[1],
-            MixedPart {
-                part_headers: {
-                    let mut h = HeaderMap::new();
-                    h.insert("Content-Type", 
"application/http".parse().unwrap());
-                    h.insert(
-                        "Content-ID",
-                        "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>"
-                            .parse()
-                            .unwrap(),
-                    );
-
-                    h
-                },
-                version: Version::HTTP_11,
-                headers: {
-                    let mut h = HeaderMap::new();
-                    h.insert(
-                        "ETag",
-                        
"\"lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk\""
-                            .parse()
-                            .unwrap(),
-                    );
-                    h.insert(
-                        "Content-Type",
-                        "application/json; charset=UTF-8".parse().unwrap(),
-                    );
-                    h.insert("Date", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
-                    h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
-                    h.insert("Cache-Control", "private, 
max-age=0".parse().unwrap());
-                    h.insert("Content-Length", "846".parse().unwrap());
-
-                    h
-                },
-                content: Bytes::from_static(
-                    r#"{"kind": "storage#object","id": 
"example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"#
-                    .as_bytes()
-                ),
-                uri: None,
-                method: None,
-                status_code: Some(StatusCode::from_u16(200).unwrap())
-            }
-         );
+            multipart.parts[1].status_code,
+            Some(StatusCode::from_u16(200).unwrap())
+        );
+
+        assert_eq!(multipart.parts[2].part_headers, {
+            let mut h = HeaderMap::new();
+            h.insert("Content-Type", "application/http".parse().unwrap());
+            h.insert(
+                "Content-ID",
+                "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>"
+                    .parse()
+                    .unwrap(),
+            );
+
+            h
+        });
+        assert_eq!(multipart.parts[2].version, Version::HTTP_11);
+        assert_eq!(multipart.parts[2].headers, {
+            let mut h = HeaderMap::new();
+            h.insert(
+                "ETag",
+                "\"lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ\""
+                    .parse()
+                    .unwrap(),
+            );
+            h.insert(
+                "Content-Type",
+                "application/json; charset=UTF-8".parse().unwrap(),
+            );
+            h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap());
+            h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
+            h.insert("Cache-Control", "private, max-age=0".parse().unwrap());
+            h.insert("Content-Length", "846".parse().unwrap());
+
+            h
+        });
+        assert_eq!(multipart.parts[2].content_length, part2_bs.len() as u64);
+        assert_eq!(multipart.parts[2].uri, None);
+        assert_eq!(multipart.parts[2].method, None);
         assert_eq!(
-            multipart.parts[2],
-            MixedPart {
-                part_headers: {
-                    let mut h = HeaderMap::new();
-                    h.insert("Content-Type", 
"application/http".parse().unwrap());
-                    h.insert(
-                        "Content-ID",
-                        "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>"
-                            .parse()
-                            .unwrap(),
-                    );
-
-                    h
-                },
-                version: Version::HTTP_11,
-                headers: {
-                    let mut h = HeaderMap::new();
-                    h.insert(
-                        "ETag",
-                        
"\"lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ\""
-                            .parse()
-                            .unwrap(),
-                    );
-                    h.insert(
-                        "Content-Type",
-                        "application/json; charset=UTF-8".parse().unwrap(),
-                    );
-                    h.insert("Date", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
-                    h.insert("Expires", "Mon, 22 Jan 2018 18:56:00 
GMT".parse().unwrap());
-                    h.insert("Cache-Control", "private, 
max-age=0".parse().unwrap());
-                    h.insert("Content-Length", "846".parse().unwrap());
-
-                    h
-                },
-                content: Bytes::from_static(
-                    r#"{"kind": "storage#object","id": 
"example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"#
-                    .as_bytes()
-                ),
-                uri: None,
-                method: None,
-                status_code: Some(StatusCode::from_u16(200).unwrap())
-            });
+            multipart.parts[2].status_code,
+            Some(StatusCode::from_u16(200).unwrap())
+        );
     }
 }
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index cd683138e..c9b670ead 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -30,12 +30,18 @@ use crate::raw::*;
 use crate::*;
 
 /// Cursor is the cursor for [`Bytes`] that implements [`oio::Read`]
+#[derive(Default)]
 pub struct Cursor {
     inner: Bytes,
     pos: u64,
 }
 
 impl Cursor {
+    /// Create a new empty cursor.
+    pub fn new() -> Self {
+        Self::default()
+    }
+
     /// Returns `true` if the remaining slice is empty.
     pub fn is_empty(&self) -> bool {
         self.pos as usize >= self.inner.len()

Reply via email to