This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c058ee051 feat(core/raw): Add stream support for multipart (#2923)
c058ee051 is described below
commit c058ee05179eafaa3acd7466a62cec779b50560a
Author: Xuanwo <[email protected]>
AuthorDate: Thu Aug 24 15:36:25 2023 +0800
feat(core/raw): Add stream support for multipart (#2923)
---
core/benches/oio/write.rs | 4 +-
core/src/raw/http_util/multipart.rs | 586 ++++++++++++++++++++++++------------
core/src/raw/oio/cursor.rs | 6 +
3 files changed, 401 insertions(+), 195 deletions(-)
diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs
index 5c15ef53f..6e26ce7e0 100644
--- a/core/benches/oio/write.rs
+++ b/core/benches/oio/write.rs
@@ -17,7 +17,9 @@
use criterion::Criterion;
use once_cell::sync::Lazy;
-use opendal::raw::oio::{AtLeastBufWriter, ExactBufWriter, Write};
+use opendal::raw::oio::AtLeastBufWriter;
+use opendal::raw::oio::ExactBufWriter;
+use opendal::raw::oio::Write;
use rand::thread_rng;
use size::Size;
diff --git a/core/src/raw/http_util/multipart.rs
b/core/src/raw/http_util/multipart.rs
index 1b64de3d0..d890ceed2 100644
--- a/core/src/raw/http_util/multipart.rs
+++ b/core/src/raw/http_util/multipart.rs
@@ -15,9 +15,14 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::VecDeque;
use std::mem;
use std::str::FromStr;
+use std::task::ready;
+use std::task::Context;
+use std::task::Poll;
+use async_trait::async_trait;
use bytes::Bytes;
use bytes::BytesMut;
use futures::stream;
@@ -39,6 +44,8 @@ use super::new_request_build_error;
use super::AsyncBody;
use super::IncomingAsyncBody;
use crate::raw::oio;
+use crate::raw::oio::Stream;
+use crate::raw::oio::Streamer;
use crate::*;
/// Multipart is a builder for multipart/form-data.
@@ -105,67 +112,130 @@ impl<T: Part> Multipart<T> {
Ok(self)
}
- pub(crate) fn build(&self) -> Bytes {
+ pub(crate) fn build(self) -> (u64, MultipartStream<T>) {
+ let mut total_size = 0;
+
let mut bs = BytesMut::new();
+ bs.extend_from_slice(b"--");
+ bs.extend_from_slice(self.boundary.as_bytes());
+ bs.extend_from_slice(b"\r\n");
- // Write headers.
- for v in self.parts.iter() {
- // Write the first boundary
- bs.extend_from_slice(b"--");
- bs.extend_from_slice(self.boundary.as_bytes());
- bs.extend_from_slice(b"\r\n");
+ let pre_part = bs.freeze();
- bs.extend_from_slice(v.format().as_ref());
+ let mut parts = VecDeque::new();
+ // Write headers.
+ for v in self.parts.into_iter() {
+ let (size, stream) = v.format();
+ total_size += pre_part.len() as u64 + size;
+ parts.push_back(stream);
}
// Write the last boundary
+ let mut bs = BytesMut::new();
bs.extend_from_slice(b"--");
bs.extend_from_slice(self.boundary.as_bytes());
bs.extend_from_slice(b"--");
bs.extend_from_slice(b"\r\n");
-
- bs.freeze()
+ let final_part = bs.freeze();
+
+ total_size += final_part.len() as u64;
+
+ (
+ total_size,
+ MultipartStream {
+ pre_part,
+ pre_part_consumed: false,
+ parts,
+ final_part: Some(final_part),
+ },
+ )
}
/// Consume the input and generate a request with multipart body.
///
- /// This founction will make sure content_type and content_length set
correctly.
+ /// This function will make sure content_type and content_length set
correctly.
pub fn apply(self, mut builder: http::request::Builder) ->
Result<Request<AsyncBody>> {
- let bs = self.build();
+ let boundary = self.boundary.clone();
+ let (content_length, stream) = self.build();
// Insert content type with correct boundary.
builder = builder.header(
CONTENT_TYPE,
- format!("multipart/{}; boundary={}", T::TYPE,
self.boundary).as_str(),
+ format!("multipart/{}; boundary={}", T::TYPE, boundary).as_str(),
);
// Insert content length with calculated size.
- builder = builder.header(CONTENT_LENGTH, bs.len());
+ builder = builder.header(CONTENT_LENGTH, content_length);
builder
- .body(AsyncBody::Bytes(bs))
+ .body(AsyncBody::Stream(Box::new(stream)))
.map_err(new_request_build_error)
}
}
+pub struct MultipartStream<T: Part> {
+ pre_part: Bytes,
+ pre_part_consumed: bool,
+
+ parts: VecDeque<T::STREAM>,
+
+ final_part: Option<Bytes>,
+}
+
+impl<T: Part> Stream for MultipartStream<T> {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if let Some(stream) = self.parts.front_mut() {
+ if !self.pre_part_consumed {
+ self.pre_part_consumed = true;
+ return Poll::Ready(Some(Ok(self.pre_part.clone())));
+ }
+ return match ready!(stream.poll_next(cx)) {
+ None => {
+ self.pre_part_consumed = false;
+ self.parts.pop_front();
+ return self.poll_next(cx);
+ }
+ Some(v) => Poll::Ready(Some(v)),
+ };
+ }
+
+ if let Some(final_part) = self.final_part.take() {
+ return Poll::Ready(Some(Ok(final_part)));
+ }
+
+ Poll::Ready(None)
+ }
+
+ /// It's possible to implement reset by calling stream's `poll_reset`.
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "MultipartStream doesn't support reset yet",
+ )))
+ }
+}
+
/// Part is a trait for multipart part.
-pub trait Part: Sized {
+pub trait Part: Sized + 'static {
/// TYPE is the type of multipart.
///
/// Current available types are: `form-data` and `mixed`
const TYPE: &'static str;
+ /// STREAM is the stream representation of this part which can be used in
multipart body.
+ type STREAM: Stream;
/// format will generates the bytes.
- fn format(&self) -> Bytes;
+ fn format(self) -> (u64, Self::STREAM);
/// parse will parse the bytes into a part.
fn parse(s: &str) -> Result<Self>;
}
/// FormDataPart is a builder for multipart/form-data part.
-#[derive(Debug)]
pub struct FormDataPart {
headers: HeaderMap,
- content: Bytes,
+
+ content_length: u64,
+ content: Streamer,
}
impl FormDataPart {
@@ -184,7 +254,8 @@ impl FormDataPart {
Self {
headers,
- content: Bytes::new(),
+ content_length: 0,
+ content: Box::new(oio::Cursor::new()),
}
}
@@ -196,31 +267,48 @@ impl FormDataPart {
/// Set the content for this part.
pub fn content(mut self, content: impl Into<Bytes>) -> Self {
- self.content = content.into();
+ let content = content.into();
+
+ self.content_length = content.len() as u64;
+ self.content = Box::new(oio::Cursor::from(content));
+ self
+ }
+
+ /// Set the stream content for this part.
+ pub fn stream(mut self, size: u64, content: Streamer) -> Self {
+ self.content_length = size;
+ self.content = content;
self
}
}
impl Part for FormDataPart {
const TYPE: &'static str = "form-data";
+ type STREAM = FormDataPartStream;
- fn format(&self) -> Bytes {
+ fn format(self) -> (u64, FormDataPartStream) {
let mut bs = BytesMut::new();
- // Write headers.
+ // Building pre-content.
for (k, v) in self.headers.iter() {
bs.extend_from_slice(k.as_str().as_bytes());
bs.extend_from_slice(b": ");
bs.extend_from_slice(v.as_bytes());
bs.extend_from_slice(b"\r\n");
}
-
- // Write content.
bs.extend_from_slice(b"\r\n");
- bs.extend_from_slice(&self.content);
- bs.extend_from_slice(b"\r\n");
-
- bs.freeze()
+ let bs = bs.freeze();
+
+ // pre-content + content + post-content (b`\r\n`)
+ let total_size = bs.len() as u64 + self.content_length + 2;
+
+ (
+ total_size,
+ FormDataPartStream {
+ pre_content: Some(bs),
+ content: Some(self.content),
+ },
+ )
}
fn parse(_: &str) -> Result<Self> {
@@ -231,16 +319,50 @@ impl Part for FormDataPart {
}
}
+pub struct FormDataPartStream {
+ /// Including headers and the first `b\r\n`
+ pre_content: Option<Bytes>,
+ content: Option<Streamer>,
+}
+
+#[async_trait]
+impl Stream for FormDataPartStream {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if let Some(pre_content) = self.pre_content.take() {
+ return Poll::Ready(Some(Ok(pre_content)));
+ }
+
+ if let Some(stream) = self.content.as_mut() {
+ return match ready!(stream.poll_next(cx)) {
+ None => {
+ self.content = None;
+ Poll::Ready(Some(Ok(Bytes::from_static(b"\r\n"))))
+ }
+ Some(v) => Poll::Ready(Some(v)),
+ };
+ }
+
+ Poll::Ready(None)
+ }
+
+ /// It's possible to implement reset by calling stream's `poll_reset`.
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "FormDataPartStream doesn't support reset yet",
+ )))
+ }
+}
+
/// MixedPart is a builder for multipart/mixed part.
-#[derive(Debug)]
-#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct MixedPart {
part_headers: HeaderMap,
/// Common
version: Version,
headers: HeaderMap,
- content: Bytes,
+ content_length: u64,
+ content: Option<Streamer>,
/// Request only
method: Option<Method>,
@@ -264,7 +386,8 @@ impl MixedPart {
version: Version::HTTP_11,
headers: HeaderMap::new(),
- content: Bytes::new(),
+ content_length: 0,
+ content: None,
uri: Some(uri),
method: None,
@@ -281,9 +404,12 @@ impl MixedPart {
let (parts, body) = req.into_parts();
- let content = match body {
- AsyncBody::Empty => Bytes::new(),
- AsyncBody::Bytes(bs) => bs,
+ let (content_length, content) = match body {
+ AsyncBody::Empty => (0, None),
+ AsyncBody::Bytes(bs) => (
+ bs.len() as u64,
+ Some(Box::new(oio::Cursor::from(bs)) as Streamer),
+ ),
AsyncBody::Stream(_) => panic!("multipart request can't contain
stream body"),
};
@@ -301,6 +427,7 @@ impl MixedPart {
),
version: parts.version,
headers: parts.headers,
+ content_length,
content,
method: Some(parts.method),
@@ -317,12 +444,11 @@ impl MixedPart {
// Swap headers directly instead of copy the entire map.
mem::swap(builder.headers_mut().unwrap(), &mut self.headers);
- let bs: Bytes = self.content;
- let length = bs.len();
- let body = IncomingAsyncBody::new(
- Box::new(oio::into_stream(stream::iter(vec![Ok(bs)]))),
- Some(length as u64),
- );
+ let body = if let Some(stream) = self.content {
+ IncomingAsyncBody::new(stream, Some(self.content_length))
+ } else {
+
IncomingAsyncBody::new(Box::new(oio::into_stream(stream::empty())), Some(0))
+ };
builder
.body(body)
@@ -355,15 +481,26 @@ impl MixedPart {
/// Set the content for this part.
pub fn content(mut self, content: impl Into<Bytes>) -> Self {
- self.content = content.into();
+ let content = content.into();
+
+ self.content_length = content.len() as u64;
+ self.content = Some(Box::new(oio::Cursor::from(content)));
+ self
+ }
+
+ /// Set the stream content for this part.
+ pub fn stream(mut self, size: u64, content: Streamer) -> Self {
+ self.content_length = size;
+ self.content = Some(content);
self
}
}
impl Part for MixedPart {
const TYPE: &'static str = "mixed";
+ type STREAM = MixedPartStream;
- fn format(&self) -> Bytes {
+ fn format(self) -> (u64, Self::STREAM) {
let mut bs = BytesMut::new();
// Write parts headers.
@@ -420,15 +557,24 @@ impl Part for MixedPart {
bs.extend_from_slice(v.as_bytes());
bs.extend_from_slice(b"\r\n");
}
-
- // Write content.
bs.extend_from_slice(b"\r\n");
- if !self.content.is_empty() {
- bs.extend_from_slice(&self.content);
- bs.extend_from_slice(b"\r\n");
+
+ let bs = bs.freeze();
+
+ // pre-content + content + post-content;
+ let mut total_size = bs.len() as u64;
+
+ if self.content.is_some() {
+ total_size += self.content_length + 2;
}
- bs.freeze()
+ (
+ total_size,
+ MixedPartStream {
+ pre_content: Some(bs),
+ content: self.content,
+ },
+ )
}
/// TODO
@@ -465,6 +611,7 @@ impl Part for MixedPart {
let parts = http_response.split("\r\n\r\n").collect::<Vec<&str>>();
let headers_content = parts[0];
let body_content = parts.get(1).unwrap_or(&"");
+ let body_bytes = Bytes::from(body_content.to_string());
let status_line = headers_content.lines().next().unwrap_or("");
let status_code = status_line
@@ -501,7 +648,8 @@ impl Part for MixedPart {
part_headers,
version: Version::HTTP_11,
headers,
- content: Bytes::from(body_content.to_string()),
+ content_length: body_bytes.len() as u64,
+ content: Some(Box::new(oio::Cursor::from(body_bytes))),
method: None,
uri: None,
@@ -517,21 +665,58 @@ impl Part for MixedPart {
}
}
+pub struct MixedPartStream {
+ /// Including headers and the first `b\r\n`
+ pre_content: Option<Bytes>,
+ content: Option<Streamer>,
+}
+
+impl Stream for MixedPartStream {
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ if let Some(pre_content) = self.pre_content.take() {
+ return Poll::Ready(Some(Ok(pre_content)));
+ }
+
+ if let Some(stream) = self.content.as_mut() {
+ return match ready!(stream.poll_next(cx)) {
+ None => {
+ self.content = None;
+ Poll::Ready(Some(Ok(Bytes::from_static(b"\r\n"))))
+ }
+ Some(v) => Poll::Ready(Some(v)),
+ };
+ }
+
+ Poll::Ready(None)
+ }
+
+ /// It's possible to implement reset by calling stream's `poll_reset`.
+ fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "MixedPartStream doesn't support reset yet",
+ )))
+ }
+}
+
#[cfg(test)]
mod tests {
use http::header::CONTENT_TYPE;
use pretty_assertions::assert_eq;
use super::*;
+ use crate::raw::oio::StreamExt;
- #[test]
- fn test_multipart_formdata_basic() {
+ #[tokio::test]
+ async fn test_multipart_formdata_basic() -> Result<()> {
let multipart = Multipart::new()
.with_boundary("lalala")
.part(FormDataPart::new("foo").content(Bytes::from("bar")))
.part(FormDataPart::new("hello").content(Bytes::from("world")));
- let body = multipart.build();
+ let (size, body) = multipart.build();
+ let bs = body.collect().await.unwrap();
+ assert_eq!(size, bs.len() as u64);
let expected = "--lalala\r\n\
content-disposition: form-data; name=\"foo\"\r\n\
@@ -543,12 +728,13 @@ mod tests {
world\r\n\
--lalala--\r\n";
- assert_eq!(expected, String::from_utf8(body.to_vec()).unwrap());
+ assert_eq!(Bytes::from(expected), bs);
+ Ok(())
}
/// This test is inspired by
<https://docs.aws.amazon.com/AmazonS3/latest/userguide/HTTPPOSTExamples.html>
- #[test]
- fn test_multipart_formdata_s3_form_upload() {
+ #[tokio::test]
+ async fn test_multipart_formdata_s3_form_upload() -> Result<()> {
let multipart = Multipart::new()
.with_boundary("9431149156168")
.part(FormDataPart::new("key").content("user/eric/MyPicture.jpg"))
@@ -564,7 +750,9 @@ mod tests {
.part(FormDataPart::new("Signature").content("0RavWzkygo6QX9caELEqKi9kDbU="))
.part(FormDataPart::new("file").header(CONTENT_TYPE,
"image/jpeg".parse().unwrap()).content("...file
content...")).part(FormDataPart::new("submit").content("Upload to Amazon S3"));
- let body = multipart.build();
+ let (size, body) = multipart.build();
+ let bs = body.collect().await?;
+ assert_eq!(size, bs.len() as u64);
let expected = r#"--9431149156168
content-disposition: form-data; name="key"
@@ -618,15 +806,17 @@ Upload to Amazon S3
expected,
// Rust can't represent `\r` in a string literal, so we
// replace `\r\n` with `\n` for comparison
- String::from_utf8(body.to_vec())
+ String::from_utf8(bs.to_vec())
.unwrap()
.replace("\r\n", "\n")
);
+
+ Ok(())
}
/// This test is inspired by <https://cloud.google.com/storage/docs/batch>
- #[test]
- fn test_multipart_mixed_gcs_batch_metadata() {
+ #[tokio::test]
+ async fn test_multipart_mixed_gcs_batch_metadata() -> Result<()> {
let multipart = Multipart::new()
.with_boundary("===============7330845974216740156==")
.part(
@@ -684,7 +874,9 @@ Upload to Amazon S3
.content(r#"{"metadata": {"type": "calico"}}"#),
);
- let body = multipart.build();
+ let (size, body) = multipart.build();
+ let bs = body.collect().await?;
+ assert_eq!(size, bs.len() as u64);
let expected = r#"--===============7330845974216740156==
Content-Type: application/http
@@ -726,15 +918,17 @@ content-length: 32
expected,
// Rust can't represent `\r` in a string literal, so we
// replace `\r\n` with `\n` for comparison
- String::from_utf8(body.to_vec())
+ String::from_utf8(bs.to_vec())
.unwrap()
.replace("\r\n", "\n")
);
+
+ Ok(())
}
/// This test is inspired by
<https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=azure-ad>
- #[test]
- fn test_multipart_mixed_azblob_batch_delete() {
+ #[tokio::test]
+ async fn test_multipart_mixed_azblob_batch_delete() -> Result<()> {
let multipart = Multipart::new()
.with_boundary("batch_357de4f7-6d0b-4e02-8cd2-6361411a9525")
.part(
@@ -786,7 +980,9 @@ content-length: 32
.header("content-length".parse().unwrap(),
"0".parse().unwrap()),
);
- let body = multipart.build();
+ let (size, body) = multipart.build();
+ let bs = body.collect().await?;
+ assert_eq!(size, bs.len() as u64);
let expected = r#"--batch_357de4f7-6d0b-4e02-8cd2-6361411a9525
Content-Type: application/http
@@ -825,10 +1021,12 @@ content-length: 0
expected,
// Rust can't represent `\r` in a string literal, so we
// replace `\r\n` with `\n` for comparison
- String::from_utf8(body.to_vec())
+ String::from_utf8(bs.to_vec())
.unwrap()
.replace("\r\n", "\n")
);
+
+ Ok(())
}
/// This test is inspired by <https://cloud.google.com/storage/docs/batch>
@@ -883,137 +1081,137 @@ Content-Length: 846
.parse(Bytes::from(response))
.unwrap();
+ let part0_bs = Bytes::from_static(
+ r#"{"kind": "storage#object","id":
"example-bucket/obj1/1495822576643790","metadata": {"type":
"tabby"}}"#.as_bytes());
+ let part1_bs = Bytes::from_static(
+ r#"{"kind": "storage#object","id":
"example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"#
+ .as_bytes()
+ );
+ let part2_bs = Bytes::from_static(
+ r#"{"kind": "storage#object","id":
"example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"#
+ .as_bytes()
+ );
+
assert_eq!(multipart.parts.len(), 3);
+
+ assert_eq!(multipart.parts[0].part_headers, {
+ let mut h = HeaderMap::new();
+ h.insert("Content-Type", "application/http".parse().unwrap());
+ h.insert(
+ "Content-ID",
+ "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>"
+ .parse()
+ .unwrap(),
+ );
+
+ h
+ });
+ assert_eq!(multipart.parts[0].version, Version::HTTP_11);
+ assert_eq!(multipart.parts[0].headers, {
+ let mut h = HeaderMap::new();
+ h.insert(
+ "ETag",
+ "\"lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y\""
+ .parse()
+ .unwrap(),
+ );
+ h.insert(
+ "Content-Type",
+ "application/json; charset=UTF-8".parse().unwrap(),
+ );
+ h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap());
+ h.insert("Expires", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
+ h.insert("Cache-Control", "private, max-age=0".parse().unwrap());
+ h.insert("Content-Length", "846".parse().unwrap());
+
+ h
+ });
+ assert_eq!(multipart.parts[0].content_length, part0_bs.len() as u64);
+ assert_eq!(multipart.parts[0].uri, None);
+ assert_eq!(multipart.parts[0].method, None);
assert_eq!(
- multipart.parts[0],
- MixedPart {
- part_headers: {
- let mut h = HeaderMap::new();
- h.insert("Content-Type",
"application/http".parse().unwrap());
- h.insert(
- "Content-ID",
- "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+1>"
- .parse()
- .unwrap(),
- );
-
- h
- },
- version: Version::HTTP_11,
- headers: {
- let mut h = HeaderMap::new();
- h.insert(
- "ETag",
-
"\"lGaP-E0memYDumK16YuUDM_6Gf0/V43j6azD55CPRGb9b6uytDYl61Y\""
- .parse()
- .unwrap(),
- );
- h.insert(
- "Content-Type",
- "application/json; charset=UTF-8".parse().unwrap(),
- );
- h.insert("Date", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
- h.insert("Expires", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
- h.insert("Cache-Control", "private,
max-age=0".parse().unwrap());
- h.insert("Content-Length", "846".parse().unwrap());
-
- h
- },
- content: Bytes::from_static(
- r#"{"kind": "storage#object","id":
"example-bucket/obj1/1495822576643790","metadata": {"type": "tabby"}}"#
- .as_bytes()
- ),
- uri: None,
- method: None,
- status_code: Some(StatusCode::from_u16(200).unwrap())
- }
+ multipart.parts[0].status_code,
+ Some(StatusCode::from_u16(200).unwrap())
);
+
+ assert_eq!(multipart.parts[1].part_headers, {
+ let mut h = HeaderMap::new();
+ h.insert("Content-Type", "application/http".parse().unwrap());
+ h.insert(
+ "Content-ID",
+ "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>"
+ .parse()
+ .unwrap(),
+ );
+
+ h
+ });
+ assert_eq!(multipart.parts[1].version, Version::HTTP_11);
+ assert_eq!(multipart.parts[1].headers, {
+ let mut h = HeaderMap::new();
+ h.insert(
+ "ETag",
+ "\"lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk\""
+ .parse()
+ .unwrap(),
+ );
+ h.insert(
+ "Content-Type",
+ "application/json; charset=UTF-8".parse().unwrap(),
+ );
+ h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap());
+ h.insert("Expires", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
+ h.insert("Cache-Control", "private, max-age=0".parse().unwrap());
+ h.insert("Content-Length", "846".parse().unwrap());
+
+ h
+ });
+ assert_eq!(multipart.parts[1].content_length, part1_bs.len() as u64);
+ assert_eq!(multipart.parts[1].uri, None);
+ assert_eq!(multipart.parts[1].method, None);
assert_eq!(
- multipart.parts[1],
- MixedPart {
- part_headers: {
- let mut h = HeaderMap::new();
- h.insert("Content-Type",
"application/http".parse().unwrap());
- h.insert(
- "Content-ID",
- "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+2>"
- .parse()
- .unwrap(),
- );
-
- h
- },
- version: Version::HTTP_11,
- headers: {
- let mut h = HeaderMap::new();
- h.insert(
- "ETag",
-
"\"lGaP-E0memYDumK16YuUDM_6Gf0/91POdd-sxSAkJnS8Dm7wMxBSDKk\""
- .parse()
- .unwrap(),
- );
- h.insert(
- "Content-Type",
- "application/json; charset=UTF-8".parse().unwrap(),
- );
- h.insert("Date", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
- h.insert("Expires", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
- h.insert("Cache-Control", "private,
max-age=0".parse().unwrap());
- h.insert("Content-Length", "846".parse().unwrap());
-
- h
- },
- content: Bytes::from_static(
- r#"{"kind": "storage#object","id":
"example-bucket/obj2/1495822576643790","metadata": {"type": "tuxedo"}}"#
- .as_bytes()
- ),
- uri: None,
- method: None,
- status_code: Some(StatusCode::from_u16(200).unwrap())
- }
- );
+ multipart.parts[1].status_code,
+ Some(StatusCode::from_u16(200).unwrap())
+ );
+
+ assert_eq!(multipart.parts[2].part_headers, {
+ let mut h = HeaderMap::new();
+ h.insert("Content-Type", "application/http".parse().unwrap());
+ h.insert(
+ "Content-ID",
+ "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>"
+ .parse()
+ .unwrap(),
+ );
+
+ h
+ });
+ assert_eq!(multipart.parts[2].version, Version::HTTP_11);
+ assert_eq!(multipart.parts[2].headers, {
+ let mut h = HeaderMap::new();
+ h.insert(
+ "ETag",
+ "\"lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ\""
+ .parse()
+ .unwrap(),
+ );
+ h.insert(
+ "Content-Type",
+ "application/json; charset=UTF-8".parse().unwrap(),
+ );
+ h.insert("Date", "Mon, 22 Jan 2018 18:56:00 GMT".parse().unwrap());
+ h.insert("Expires", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
+ h.insert("Cache-Control", "private, max-age=0".parse().unwrap());
+ h.insert("Content-Length", "846".parse().unwrap());
+
+ h
+ });
+ assert_eq!(multipart.parts[2].content_length, part2_bs.len() as u64);
+ assert_eq!(multipart.parts[2].uri, None);
+ assert_eq!(multipart.parts[2].method, None);
assert_eq!(
- multipart.parts[2],
- MixedPart {
- part_headers: {
- let mut h = HeaderMap::new();
- h.insert("Content-Type",
"application/http".parse().unwrap());
- h.insert(
- "Content-ID",
- "<response-b29c5de2-0db4-490b-b421-6a51b598bd22+3>"
- .parse()
- .unwrap(),
- );
-
- h
- },
- version: Version::HTTP_11,
- headers: {
- let mut h = HeaderMap::new();
- h.insert(
- "ETag",
-
"\"lGaP-E0memYDumK16YuUDM_6Gf0/d2Z1F1_ZVbB1dC0YKM9rX5VAgIQ\""
- .parse()
- .unwrap(),
- );
- h.insert(
- "Content-Type",
- "application/json; charset=UTF-8".parse().unwrap(),
- );
- h.insert("Date", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
- h.insert("Expires", "Mon, 22 Jan 2018 18:56:00
GMT".parse().unwrap());
- h.insert("Cache-Control", "private,
max-age=0".parse().unwrap());
- h.insert("Content-Length", "846".parse().unwrap());
-
- h
- },
- content: Bytes::from_static(
- r#"{"kind": "storage#object","id":
"example-bucket/obj3/1495822576643790","metadata": {"type": "calico"}}"#
- .as_bytes()
- ),
- uri: None,
- method: None,
- status_code: Some(StatusCode::from_u16(200).unwrap())
- });
+ multipart.parts[2].status_code,
+ Some(StatusCode::from_u16(200).unwrap())
+ );
}
}
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index cd683138e..c9b670ead 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -30,12 +30,18 @@ use crate::raw::*;
use crate::*;
/// Cursor is the cursor for [`Bytes`] that implements [`oio::Read`]
+#[derive(Default)]
pub struct Cursor {
inner: Bytes,
pos: u64,
}
impl Cursor {
+ /// Create a new empty cursor.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
/// Returns `true` if the remaining slice is empty.
pub fn is_empty(&self) -> bool {
self.pos as usize >= self.inner.len()