This is an automated email from the ASF dual-hosted git repository. meteorgan pushed a commit to branch compfs in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 2ef86990c21b0959296e4e12d5a47896326d6e29 Author: meteorgan <[email protected]> AuthorDate: Thu Nov 6 23:14:31 2025 +0800 feat(services/comfs): implement IoVectoredBuf for Buffer --- core/src/services/compfs/core.rs | 46 +++++--------------------------------- core/src/services/compfs/writer.rs | 9 +------- core/tests/behavior/async_write.rs | 39 +++++++++++++++++++++++++++++--- 3 files changed, 43 insertions(+), 51 deletions(-) diff --git a/core/src/services/compfs/core.rs b/core/src/services/compfs/core.rs index 4f8dcec42..6151ac8e7 100644 --- a/core/src/services/compfs/core.rs +++ b/core/src/services/compfs/core.rs @@ -19,7 +19,7 @@ use std::future::Future; use std::path::PathBuf; use std::sync::Arc; -use compio::buf::IoBuf; +use compio::buf::{IoBuf, IoBuffer, IoVectoredBuf}; use compio::dispatcher::Dispatcher; use crate::raw::*; @@ -81,45 +81,11 @@ impl CompfsCore { } } -// TODO: impl IoVectoredBuf for Buffer -// impl IoVectoredBuf for Buffer { -// fn as_dyn_bufs(&self) -> impl Iterator<Item = &dyn IoBuf> {} -// -// fn owned_iter(self) -> Result<OwnedIter<impl OwnedIterator<Inner = Self>>, Self> { -// Ok(OwnedIter::new(BufferIter { -// current: self.current(), -// buf: self, -// })) -// } -// } - -// #[derive(Debug, Clone)] -// struct BufferIter { -// buf: Buffer, -// current: Bytes, -// } - -// impl IntoInner for BufferIter { -// type Inner = Buffer; -// -// fn into_inner(self) -> Self::Inner { -// self.buf -// } -// } - -// impl OwnedIterator for BufferIter { -// fn next(mut self) -> Result<Self, Self::Inner> { -// let Some(current) = self.buf.next() else { -// return Err(self.buf); -// }; -// self.current = current; -// Ok(self) -// } -// -// fn current(&self) -> &dyn IoBuf { -// &self.current -// } -// } +impl IoVectoredBuf for Buffer { + unsafe fn iter_io_buffer(&self) -> impl Iterator<Item = IoBuffer> { + self.clone().map(|b| unsafe { b.as_io_buffer() }) + } +} #[cfg(test)] mod tests { diff --git a/core/src/services/compfs/writer.rs b/core/src/services/compfs/writer.rs index e4573f157..50269efd2 100644 --- a/core/src/services/compfs/writer.rs +++ b/core/src/services/compfs/writer.rs @@ -42,11 +42,6 @@ impl CompfsWriter { } impl oio::Write for CompfsWriter { - /// FIXME - /// - /// the write_all doesn't work correctly if `bs` is non-contiguous. - /// - /// The IoBuf::buf_len() only returns the length of the current buffer. async fn write(&mut self, bs: Buffer) -> Result<()> { let Some(mut file) = self.file.clone() else { return Err(Error::new(ErrorKind::Unexpected, "file has closed")); @@ -55,9 +50,7 @@ impl oio::Write for CompfsWriter { let pos = self .core .exec(move || async move { - for b in bs { - buf_try!(@try file.write_all(b).await); - } + buf_try!(@try file.write_vectored_all(bs).await); Ok(file.position()) }) .await?; diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index b98675897..f1264a559 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -59,7 +59,8 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) { test_writer_abort_with_concurrent, test_writer_futures_copy, test_writer_futures_copy_with_concurrent, - test_writer_return_metadata + test_writer_return_metadata, + test_writer_write_non_contiguous_data )) } @@ -758,7 +759,7 @@ pub async fn test_write_with_if_none_match(op: Operator) -> Result<()> { Ok(()) } -/// Write an file with if_not_exists will get a ConditionNotMatch error if file exists. +/// Write a file with if_not_exists will get a ConditionNotMatch error if file exists. pub async fn test_write_with_if_not_exists(op: Operator) -> Result<()> { if !op.info().full_capability().write_with_if_not_exists { return Ok(()); @@ -782,7 +783,7 @@ pub async fn test_write_with_if_not_exists(op: Operator) -> Result<()> { Ok(()) } -/// Write an file with if_match will get a ConditionNotMatch error if file's etag does not match. +/// Write a file with if_match will get a ConditionNotMatch error if file's etag does not match. pub async fn test_write_with_if_match(op: Operator) -> Result<()> { if !op.info().full_capability().write_with_if_match { return Ok(()); @@ -819,3 +820,35 @@ pub async fn test_write_with_if_match(op: Operator) -> Result<()> { Ok(()) } + +pub async fn test_writer_write_non_contiguous_data(op: Operator) -> Result<()> { + let path = TEST_FIXTURE.new_file_path(); + let size = 5 * 1024 * 1024; // write file with 5 MiB + let content_a = gen_fixed_bytes(size); + let digest_a = Sha256::digest(&content_a); + let content_b = gen_fixed_bytes(size); + let digest_b = Sha256::digest(&content_b); + + let mut w = op.writer(&path).await?; + w.write(vec![Bytes::from(content_a), Bytes::from(content_b)]) + .await?; + w.close().await?; + + let meta = op.stat(&path).await.expect("stat must succeed"); + assert_eq!(meta.content_length(), (size * 2) as u64); + + let bs = op.read(&path).await?.to_bytes(); + assert_eq!(bs.len(), size * 2, "read size"); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[..size])), + format!("{:x}", digest_a), + "read content a" + ); + assert_eq!( + format!("{:x}", Sha256::digest(&bs[size..])), + format!("{:x}", digest_b), + "read content b" + ); + + Ok(()) +}
