This is an automated email from the ASF dual-hosted git repository. junouyang pushed a commit to branch feat/blocking-operator-api-sync in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit d3582341eaa69e02eed1f19b6388d0d8ad87a445 Author: owl <[email protected]> AuthorDate: Thu Aug 24 17:05:06 2023 +0800 feat(types): synchronous blocking operator and operator's API --- core/src/types/operator/blocking_operator.rs | 41 +++++++ core/src/types/operator/operator_functions.rs | 159 +++++++++++++++++++++++++- core/src/types/operator/operator_futures.rs | 6 + 3 files changed, 204 insertions(+), 2 deletions(-) diff --git a/core/src/types/operator/blocking_operator.rs b/core/src/types/operator/blocking_operator.rs index 13ea1ec56..442368dc5 100644 --- a/core/src/types/operator/blocking_operator.rs +++ b/core/src/types/operator/blocking_operator.rs @@ -608,6 +608,47 @@ impl BlockingOperator { BlockingWriter::create(self.inner().clone(), &path, op) } + /// Create a new reader with extra options + /// + /// # Examples + /// + /// ```no_run + /// # use anyhow::Result; + /// use opendal::BlockingOperator; + /// use opendal::EntryMode; + /// use opendal::Metakey; + /// # fn test(op: BlockingOperator) -> Result<()> { + /// let mut w = op.writer_with("path/to/file").call()?; + /// w.write(vec![0; 4096])?; + /// w.write(vec![1; 4096])?; + /// w.close()?; + /// # Ok(()) + /// # } + /// ``` + pub fn writer_with(&self, path: &str) -> FunctionWriter { + let path = normalize_path(path); + + FunctionWriter(OperatorFunction::new( + self.inner().clone(), + path, + OpWrite::default(), + |inner, path, args| { + let path = normalize_path(&path); + + if !validate_path(&path, EntryMode::FILE) { + return Err( + Error::new(ErrorKind::IsADirectory, "write path is a directory") + .with_operation("BlockingOperator::writer_with") + .with_context("service", inner.info().scheme().into_static()) + .with_context("path", &path), + ); + } + + BlockingWriter::create(inner.clone(), &path, args) + }, + )) + } + /// Delete given path. /// /// # Notes diff --git a/core/src/types/operator/operator_functions.rs b/core/src/types/operator/operator_functions.rs index ba865d9dd..49a0e7e85 100644 --- a/core/src/types/operator/operator_functions.rs +++ b/core/src/types/operator/operator_functions.rs @@ -76,7 +76,35 @@ pub struct FunctionWrite( ); impl FunctionWrite { - /// Set the content length for this operation. + /// Set the append mode of op. + /// + /// If the append mode is set, the data will be appended to the end of the file. + /// + /// # Notes + /// + /// Service could return `Unsupported` if the underlying storage does not support append. + pub fn append(mut self, v: bool) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_append(v), bs)); + self + } + + /// Set the buffer size of op. + /// + /// If buffer size is set, the data will be buffered by the underlying writer. + /// + /// ## NOTE + /// + /// Service could have their own minimum buffer size while perform write operations like + /// multipart uploads. So the buffer size may be larger than the given buffer size. + pub fn buffer_size(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs)); + self + } + + /// Set the content length of op. + /// + /// If the content length is not set, the content length will be + /// calculated automatically by buffering part of data. pub fn content_length(mut self, v: u64) -> Self { self.0 = self .0 @@ -84,7 +112,7 @@ impl FunctionWrite { self } - /// Set the content type for this operation. + /// Set the content type of option pub fn content_type(mut self, v: &str) -> Self { self.0 = self .0 @@ -92,6 +120,22 @@ impl FunctionWrite { self } + /// Set the content disposition of option + pub fn content_disposition(mut self, v: &str) -> Self { + self.0 = self + .0 + .map_args(|(args, bs)| (args.with_content_disposition(v), bs)); + self + } + + /// Set the content type of option + pub fn cache_control(mut self, v: &str) -> Self { + self.0 = self + .0 + .map_args(|(args, bs)| (args.with_cache_control(v), bs)); + self + } + /// Call the function to consume all the input and generate a /// result. pub fn call(self) -> Result<()> { @@ -99,6 +143,75 @@ impl FunctionWrite { } } +/// Function that generated by [`BlockingOperator::writer_with`]. +/// +/// Users can add more options by public functions provided by this struct. +pub struct FunctionWriter( + /// The args for FunctionWriter is a bit special because we also + /// need to move the bytes input this function. + pub(crate) OperatorFunction<OpWrite, BlockingWriter>, +); + +impl FunctionWriter { + /// Set the append mode of op. + /// + /// If the append mode is set, the data will be appended to the end of the file. + /// + /// # Notes + /// + /// Service could return `Unsupported` if the underlying storage does not support append. + pub fn append(mut self, v: bool) -> Self { + self.0 = self.0.map_args(|args| args.with_append(v)); + self + } + + /// Set the buffer size of op. + /// + /// If buffer size is set, the data will be buffered by the underlying writer. + /// + /// ## NOTE + /// + /// Service could have their own minimum buffer size while perform write operations like + /// multipart uploads. So the buffer size may be larger than the given buffer size. + pub fn buffer_size(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_buffer_size(v)); + self + } + + /// Set the content length of op. + /// + /// If the content length is not set, the content length will be + /// calculated automatically by buffering part of data. + pub fn content_length(mut self, v: u64) -> Self { + self.0 = self.0.map_args(|args| args.with_content_length(v)); + self + } + + /// Set the content type of option + pub fn content_type(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_content_type(v)); + self + } + + /// Set the content disposition of option + pub fn content_disposition(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_content_disposition(v)); + self + } + + /// Set the content type of option + pub fn cache_control(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_cache_control(v)); + self + } + + /// Call the function to consume all the input and generate a + /// result. + pub fn call(self) -> Result<BlockingWriter> { + self.0.call() + } +} + /// Function that generated by [`BlockingOperator::delete_with`]. /// /// Users can add more options by public functions provided by this struct. @@ -223,6 +336,48 @@ impl FunctionReader { self } + /// Sets the content-disposition header that should be send back by the remote read operation. + pub fn override_content_disposition(mut self, content_disposition: &str) -> Self { + self.0 = self + .0 + .map_args(|args| args.with_override_content_disposition(content_disposition)); + self + } + + /// Sets the cache-control header that should be send back by the remote read operation. + pub fn override_cache_control(mut self, cache_control: &str) -> Self { + self.0 = self + .0 + .map_args(|args| args.with_override_cache_control(cache_control)); + self + } + + /// Sets the content-type header that should be send back by the remote read operation. + pub fn override_content_type(mut self, content_type: &str) -> Self { + self.0 = self + .0 + .map_args(|args| args.with_override_content_type(content_type)); + self + } + + /// Set the If-Match for this operation. + pub fn if_match(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_if_match(v)); + self + } + + /// Set the If-None-Match for this operation. + pub fn if_none_match(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_if_none_match(v)); + self + } + + /// Set the version for this operation. + pub fn version(mut self, v: &str) -> Self { + self.0 = self.0.map_args(|args| args.with_version(v)); + self + } + /// Call the function to consume all the input and generate a /// result. pub fn call(self) -> Result<BlockingReader> { diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 10a9c061c..7f2df677a 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -546,6 +546,12 @@ impl Future for FutureDelete { pub struct FutureList(pub(crate) OperatorFuture<OpList, Vec<Entry>>); impl FutureList { + /// Change the limit of this list operation. + pub fn limit(mut self, v: usize) -> Self { + self.0 = self.0.map_args(|args| args.with_limit(v)); + self + } + /// Change the start_after of this list operation. pub fn start_after(mut self, v: &str) -> Self { self.0 = self.0.map_args(|args| args.with_start_after(v));
