This is an automated email from the ASF dual-hosted git repository.

junouyang pushed a commit to branch feat/blocking-operator-api-sync
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit d3582341eaa69e02eed1f19b6388d0d8ad87a445
Author: owl <[email protected]>
AuthorDate: Thu Aug 24 17:05:06 2023 +0800

    feat(types): synchronous blocking operator and operator's API
---
 core/src/types/operator/blocking_operator.rs  |  41 +++++++
 core/src/types/operator/operator_functions.rs | 159 +++++++++++++++++++++++++-
 core/src/types/operator/operator_futures.rs   |   6 +
 3 files changed, 204 insertions(+), 2 deletions(-)

diff --git a/core/src/types/operator/blocking_operator.rs 
b/core/src/types/operator/blocking_operator.rs
index 13ea1ec56..442368dc5 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -608,6 +608,47 @@ impl BlockingOperator {
         BlockingWriter::create(self.inner().clone(), &path, op)
     }
 
+    /// Create a new reader with extra options
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// # use anyhow::Result;
+    /// use opendal::BlockingOperator;
+    /// use opendal::EntryMode;
+    /// use opendal::Metakey;
+    /// # fn test(op: BlockingOperator) -> Result<()> {
+    /// let mut w = op.writer_with("path/to/file").call()?;
+    /// w.write(vec![0; 4096])?;
+    /// w.write(vec![1; 4096])?;
+    /// w.close()?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub fn writer_with(&self, path: &str) -> FunctionWriter {
+        let path = normalize_path(path);
+
+        FunctionWriter(OperatorFunction::new(
+            self.inner().clone(),
+            path,
+            OpWrite::default(),
+            |inner, path, args| {
+                let path = normalize_path(&path);
+
+                if !validate_path(&path, EntryMode::FILE) {
+                    return Err(
+                        Error::new(ErrorKind::IsADirectory, "write path is a 
directory")
+                            .with_operation("BlockingOperator::writer_with")
+                            .with_context("service", 
inner.info().scheme().into_static())
+                            .with_context("path", &path),
+                    );
+                }
+        
+                BlockingWriter::create(inner.clone(), &path, args)
+            },
+        ))
+    }
+
     /// Delete given path.
     ///
     /// # Notes
diff --git a/core/src/types/operator/operator_functions.rs 
b/core/src/types/operator/operator_functions.rs
index ba865d9dd..49a0e7e85 100644
--- a/core/src/types/operator/operator_functions.rs
+++ b/core/src/types/operator/operator_functions.rs
@@ -76,7 +76,35 @@ pub struct FunctionWrite(
 );
 
 impl FunctionWrite {
-    /// Set the content length for this operation.
+    /// Set the append mode of op.
+    ///
+    /// If the append mode is set, the data will be appended to the end of the 
file.
+    ///
+    /// # Notes
+    ///
+    /// Service could return `Unsupported` if the underlying storage does not 
support append.
+    pub fn append(mut self, v: bool) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_append(v), bs));
+        self
+    }
+
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn buffer_size(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|(args, bs)| (args.with_buffer_size(v), bs));
+        self
+    }
+
+    /// Set the content length of op.
+    ///
+    /// If the content length is not set, the content length will be
+    /// calculated automatically by buffering part of data.
     pub fn content_length(mut self, v: u64) -> Self {
         self.0 = self
             .0
@@ -84,7 +112,7 @@ impl FunctionWrite {
         self
     }
 
-    /// Set the content type for this operation.
+    /// Set the content type of option
     pub fn content_type(mut self, v: &str) -> Self {
         self.0 = self
             .0
@@ -92,6 +120,22 @@ impl FunctionWrite {
         self
     }
 
+    /// Set the content disposition of option
+    pub fn content_disposition(mut self, v: &str) -> Self {
+        self.0 = self
+            .0
+            .map_args(|(args, bs)| (args.with_content_disposition(v), bs));
+        self
+    }
+
+    /// Set the content type of option
+    pub fn cache_control(mut self, v: &str) -> Self {
+        self.0 = self
+            .0
+            .map_args(|(args, bs)| (args.with_cache_control(v), bs));
+        self
+    }
+
     /// Call the function to consume all the input and generate a
     /// result.
     pub fn call(self) -> Result<()> {
@@ -99,6 +143,75 @@ impl FunctionWrite {
     }
 }
 
+/// Function that generated by [`BlockingOperator::writer_with`].
+///
+/// Users can add more options by public functions provided by this struct.
+pub struct FunctionWriter(
+    /// The args for FunctionWriter is a bit special because we also
+    /// need to move the bytes input this function.
+    pub(crate) OperatorFunction<OpWrite, BlockingWriter>,
+);
+
+impl FunctionWriter {
+    /// Set the append mode of op.
+    ///
+    /// If the append mode is set, the data will be appended to the end of the 
file.
+    ///
+    /// # Notes
+    ///
+    /// Service could return `Unsupported` if the underlying storage does not 
support append.
+    pub fn append(mut self, v: bool) -> Self {
+        self.0 = self.0.map_args(|args| args.with_append(v));
+        self
+    }
+
+    /// Set the buffer size of op.
+    ///
+    /// If buffer size is set, the data will be buffered by the underlying 
writer.
+    ///
+    /// ## NOTE
+    ///
+    /// Service could have their own minimum buffer size while perform write 
operations like
+    /// multipart uploads. So the buffer size may be larger than the given 
buffer size.
+    pub fn buffer_size(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_buffer_size(v));
+        self
+    }
+
+    /// Set the content length of op.
+    ///
+    /// If the content length is not set, the content length will be
+    /// calculated automatically by buffering part of data.
+    pub fn content_length(mut self, v: u64) -> Self {
+        self.0 = self.0.map_args(|args| args.with_content_length(v));
+        self
+    }
+
+    /// Set the content type of option
+    pub fn content_type(mut self, v: &str) -> Self {
+        self.0 = self.0.map_args(|args| args.with_content_type(v));
+        self
+    }
+
+    /// Set the content disposition of option
+    pub fn content_disposition(mut self, v: &str) -> Self {
+        self.0 = self.0.map_args(|args| args.with_content_disposition(v));
+        self
+    }
+
+    /// Set the content type of option
+    pub fn cache_control(mut self, v: &str) -> Self {
+        self.0 = self.0.map_args(|args| args.with_cache_control(v));
+        self
+    }
+
+    /// Call the function to consume all the input and generate a
+    /// result.
+    pub fn call(self) -> Result<BlockingWriter> {
+        self.0.call()
+    }
+}
+
 /// Function that generated by [`BlockingOperator::delete_with`].
 ///
 /// Users can add more options by public functions provided by this struct.
@@ -223,6 +336,48 @@ impl FunctionReader {
         self
     }
 
+    /// Sets the content-disposition header that should be send back by the 
remote read operation.
+    pub fn override_content_disposition(mut self, content_disposition: &str) 
-> Self {
+        self.0 = self
+            .0
+            .map_args(|args| 
args.with_override_content_disposition(content_disposition));
+        self
+    }
+
+    /// Sets the cache-control header that should be send back by the remote 
read operation.
+    pub fn override_cache_control(mut self, cache_control: &str) -> Self {
+        self.0 = self
+            .0
+            .map_args(|args| args.with_override_cache_control(cache_control));
+        self
+    }
+
+    /// Sets the content-type header that should be send back by the remote 
read operation.
+    pub fn override_content_type(mut self, content_type: &str) -> Self {
+        self.0 = self
+            .0
+            .map_args(|args| args.with_override_content_type(content_type));
+        self
+    }
+
+    /// Set the If-Match for this operation.
+    pub fn if_match(mut self, v: &str) -> Self {
+        self.0 = self.0.map_args(|args| args.with_if_match(v));
+        self
+    }
+
+    /// Set the If-None-Match for this operation.
+    pub fn if_none_match(mut self, v: &str) -> Self {
+        self.0 = self.0.map_args(|args| args.with_if_none_match(v));
+        self
+    }
+
+    /// Set the version for this operation.
+    pub fn version(mut self, v: &str) -> Self {
+        self.0 = self.0.map_args(|args| args.with_version(v));
+        self
+    }
+
     /// Call the function to consume all the input and generate a
     /// result.
     pub fn call(self) -> Result<BlockingReader> {
diff --git a/core/src/types/operator/operator_futures.rs 
b/core/src/types/operator/operator_futures.rs
index 10a9c061c..7f2df677a 100644
--- a/core/src/types/operator/operator_futures.rs
+++ b/core/src/types/operator/operator_futures.rs
@@ -546,6 +546,12 @@ impl Future for FutureDelete {
 pub struct FutureList(pub(crate) OperatorFuture<OpList, Vec<Entry>>);
 
 impl FutureList {
+    /// Change the limit of this list operation.
+    pub fn limit(mut self, v: usize) -> Self {
+        self.0 = self.0.map_args(|args| args.with_limit(v));
+        self
+    }
+
     /// Change the start_after of this list operation.
     pub fn start_after(mut self, v: &str) -> Self {
         self.0 = self.0.map_args(|args| args.with_start_after(v));

Reply via email to