This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new d30763385 bug: write operation won't split payload into chunks
following configuration (#6796)
d30763385 is described below
commit d30763385450ae3305892312864f2551a521008f
Author: ChenChen Lai <[email protected]>
AuthorDate: Sat Nov 29 21:01:32 2025 +0800
bug: write operation won't split payload into chunks following
configuration (#6796)
* bug: write operation won't split payload into chunks following
configuration
* try to fix error
* remove operator add unitest
* fix clippy
* Update core/src/types/context/write.rs
---------
Co-authored-by: Xuanwo <[email protected]>
---
core/src/types/context/write.rs | 134 ++++++++++++++++++++++++++++++++++--
core/src/types/operator/operator.rs | 1 +
2 files changed, 129 insertions(+), 6 deletions(-)
diff --git a/core/src/types/context/write.rs b/core/src/types/context/write.rs
index cc6c7720c..11a8551dd 100644
--- a/core/src/types/context/write.rs
+++ b/core/src/types/context/write.rs
@@ -223,11 +223,16 @@ mod tests {
struct MockWriter {
buf: Arc<Mutex<Vec<u8>>>,
+ write_sizes: Arc<Mutex<Vec<usize>>>,
}
impl Write for MockWriter {
async fn write(&mut self, bs: Buffer) -> Result<()> {
- debug!("test_fuzz_exact_buf_writer: flush size: {}", &bs.len());
+ let size = bs.len();
+ debug!("test_fuzz_exact_buf_writer: flush size: {}", size);
+
+ let mut write_sizes = self.write_sizes.lock().await;
+ write_sizes.push(size);
let mut buf = self.buf.lock().await;
buf.put(bs);
@@ -256,7 +261,14 @@ mod tests {
rng.fill_bytes(&mut expected);
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone()
}), Some(10), true);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ true,
+ );
let mut bs = Bytes::from(expected.clone());
while !bs.is_empty() {
@@ -284,7 +296,14 @@ mod tests {
.try_init();
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone()
}), Some(10), false);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ false,
+ );
let mut rng = thread_rng();
let mut expected = vec![0; 15];
@@ -315,7 +334,14 @@ mod tests {
.try_init();
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone()
}), Some(10), false);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ false,
+ );
let mut rng = thread_rng();
let mut expected = vec![];
@@ -358,7 +384,14 @@ mod tests {
.try_init();
let buf = Arc::new(Mutex::new(vec![]));
- let mut w = WriteGenerator::new(Box::new(MockWriter { buf: buf.clone()
}), Some(10), false);
+ let mut w = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
+ Some(10),
+ false,
+ );
let mut rng = thread_rng();
let mut expected = vec![];
@@ -408,7 +441,10 @@ mod tests {
let buf = Arc::new(Mutex::new(vec![]));
let buffer_size = rng.gen_range(1..10);
let mut writer = WriteGenerator::new(
- Box::new(MockWriter { buf: buf.clone() }),
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: Arc::new(Mutex::new(vec![])),
+ }),
Some(buffer_size),
true,
);
@@ -438,4 +474,90 @@ mod tests {
);
Ok(())
}
+
+ /// Test that when writing a large buffer in exact mode, it gets split
into chunks.
+ ///
+ /// This test verifies that WriteGenerator correctly handles large buffers
by
+ /// splitting them into chunks of the configured chunk size, rather than
writing
+ /// everything at once.
+ #[tokio::test]
+ async fn test_exact_buf_writer_large_buffer_splits_into_chunks() ->
Result<()> {
+ let _ = tracing_subscriber::fmt()
+ .pretty()
+ .with_test_writer()
+ .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
+ .try_init();
+
+ let chunk_size = 10;
+ let large_buffer_size = 25; // 2.5x chunk_size
+
+ let buf = Arc::new(Mutex::new(vec![]));
+ let write_sizes = Arc::new(Mutex::new(vec![]));
+ let mut writer = WriteGenerator::new(
+ Box::new(MockWriter {
+ buf: buf.clone(),
+ write_sizes: write_sizes.clone(),
+ }),
+ Some(chunk_size),
+ true, // exact mode
+ );
+
+ let mut rng = thread_rng();
+ let mut expected = vec![0; large_buffer_size];
+ rng.fill_bytes(&mut expected);
+
+ let bs = Bytes::from(expected.clone());
+ // In exact mode, a large buffer should be written in chunks.
+ // We need to call write multiple times until all data is written.
+ let mut remaining = bs.clone();
+ while !remaining.is_empty() {
+ let n = writer.write(remaining.clone().into()).await?;
+ remaining.advance(n);
+ }
+
+ writer.close().await?;
+
+ // Verify all data was written
+ let buf = buf.lock().await;
+ assert_eq!(buf.len(), expected.len());
+ assert_eq!(
+ format!("{:x}", Sha256::digest(&*buf)),
+ format!("{:x}", Sha256::digest(&expected))
+ );
+
+ // Verify that writes were split into chunks (except possibly the last
one)
+ let write_sizes = write_sizes.lock().await;
+ // In exact mode, all writes except the last should be exactly
chunk_size
+ // The last write might be smaller if there's a remainder
+ for (i, &size) in write_sizes.iter().enumerate() {
+ if i < write_sizes.len() - 1 {
+ // All writes except the last should be exactly chunk_size
+ assert_eq!(
+ size, chunk_size,
+ "Write {} should be exactly chunk_size {}, but was {}",
+ i, chunk_size, size
+ );
+ } else {
+ // Last write should be <= chunk_size
+ assert!(
+ size <= chunk_size,
+ "Last write should be <= chunk_size {}, but was {}",
+ chunk_size,
+ size
+ );
+ }
+ }
+
+ // Verify we got the expected number of writes
+ let expected_writes = large_buffer_size.div_ceil(chunk_size);
+ assert_eq!(
+ write_sizes.len(),
+ expected_writes,
+ "Expected {} writes, but got {}",
+ expected_writes,
+ write_sizes.len()
+ );
+
+ Ok(())
+ }
}
diff --git a/core/src/types/operator/operator.rs
b/core/src/types/operator/operator.rs
index 82c722e76..347dece2d 100644
--- a/core/src/types/operator/operator.rs
+++ b/core/src/types/operator/operator.rs
@@ -849,6 +849,7 @@ impl Operator {
}
let (args, opts) = opts.into();
+
let context = WriteContext::new(acc, path, args, opts);
let mut w = Writer::new(context).await?;
w.write(bs).await?;