This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a54e44291 feat(io): Add delete_stream to Storage trait (#2216)
a54e44291 is described below
commit a54e44291935c42bfd3da254f7fb521b3f7f3c05
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Mar 16 18:57:37 2026 -0700
feat(io): Add delete_stream to Storage trait (#2216)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
- Closes #2065
## What changes are included in this PR?
- Add `delete_stream` to `Storage` trait to support batch delete
- Expose `delete_stream` in `FileIO` as well
<!--
Provide a summary of the modifications in this PR. List the main changes
such as new features, bug fixes, refactoring, or any other updates.
-->
## Are these changes tested?
Added uts
Addded integtests for opendal
<!--
Specify what test covers (unit test, integration test, etc.).
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
---
Cargo.lock | 1 +
crates/iceberg/src/io/file_io.rs | 13 +
crates/iceberg/src/io/storage/local_fs.rs | 66 +++++
crates/iceberg/src/io/storage/memory.rs | 61 +++++
crates/iceberg/src/io/storage/mod.rs | 4 +
crates/storage/opendal/Cargo.toml | 1 +
crates/storage/opendal/src/azdls.rs | 6 +-
crates/storage/opendal/src/lib.rs | 311 ++++++++++++++++++++++++
crates/storage/opendal/tests/file_io_s3_test.rs | 43 ++++
9 files changed, 503 insertions(+), 3 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 908b5a58a..39812b010 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3548,6 +3548,7 @@ dependencies = [
"async-trait",
"bytes",
"cfg-if",
+ "futures",
"iceberg",
"iceberg_test_utils",
"opendal",
diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs
index 341b19d09..594b070e0 100644
--- a/crates/iceberg/src/io/file_io.rs
+++ b/crates/iceberg/src/io/file_io.rs
@@ -19,6 +19,7 @@ use std::ops::Range;
use std::sync::{Arc, OnceLock};
use bytes::Bytes;
+use futures::{Stream, StreamExt};
use super::storage::{
LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig,
StorageFactory,
@@ -140,6 +141,18 @@ impl FileIO {
self.get_storage()?.delete_prefix(path.as_ref()).await
}
+ /// Delete multiple files from a stream of paths.
+ ///
+ /// # Arguments
+ ///
+ /// * paths: A stream of absolute paths starting with the scheme string
used to construct [`FileIO`].
+ pub async fn delete_stream(
+ &self,
+ paths: impl Stream<Item = String> + Send + 'static,
+ ) -> Result<()> {
+ self.get_storage()?.delete_stream(paths.boxed()).await
+ }
+
/// Check file exists.
///
/// # Arguments
diff --git a/crates/iceberg/src/io/storage/local_fs.rs
b/crates/iceberg/src/io/storage/local_fs.rs
index d6dd5b433..e96e951ba 100644
--- a/crates/iceberg/src/io/storage/local_fs.rs
+++ b/crates/iceberg/src/io/storage/local_fs.rs
@@ -29,6 +29,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
+use futures::StreamExt;
+use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use crate::io::{
@@ -200,6 +202,13 @@ impl Storage for LocalFsStorage {
Ok(())
}
+ async fn delete_stream(&self, mut paths: BoxStream<'static, String>) ->
Result<()> {
+ while let Some(path) = paths.next().await {
+ self.delete(&path).await?;
+ }
+ Ok(())
+ }
+
fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
}
@@ -534,4 +543,61 @@ mod tests {
assert!(path.exists());
}
+
+ #[tokio::test]
+ async fn test_local_fs_storage_delete_stream() {
+ use futures::stream;
+
+ let tmp_dir = TempDir::new().unwrap();
+ let storage = LocalFsStorage::new();
+
+ // Create multiple files
+ let file1 = tmp_dir.path().join("file1.txt");
+ let file2 = tmp_dir.path().join("file2.txt");
+ let file3 = tmp_dir.path().join("file3.txt");
+
+ storage
+ .write(file1.to_str().unwrap(), Bytes::from("1"))
+ .await
+ .unwrap();
+ storage
+ .write(file2.to_str().unwrap(), Bytes::from("2"))
+ .await
+ .unwrap();
+ storage
+ .write(file3.to_str().unwrap(), Bytes::from("3"))
+ .await
+ .unwrap();
+
+ // Verify files exist
+ assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
+ assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
+ assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
+
+ // Delete multiple files using stream
+ let paths = vec![
+ file1.to_str().unwrap().to_string(),
+ file2.to_str().unwrap().to_string(),
+ ];
+ let path_stream = stream::iter(paths).boxed();
+ storage.delete_stream(path_stream).await.unwrap();
+
+ // Verify deleted files no longer exist
+ assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
+ assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());
+
+ // Verify file3 still exists
+ assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_local_fs_storage_delete_stream_empty() {
+ use futures::stream;
+
+ let storage = LocalFsStorage::new();
+
+ // Delete with empty stream should succeed
+ let path_stream = stream::iter(Vec::<String>::new()).boxed();
+ storage.delete_stream(path_stream).await.unwrap();
+ }
}
diff --git a/crates/iceberg/src/io/storage/memory.rs
b/crates/iceberg/src/io/storage/memory.rs
index cb01ee470..f33dbd07b 100644
--- a/crates/iceberg/src/io/storage/memory.rs
+++ b/crates/iceberg/src/io/storage/memory.rs
@@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use bytes::Bytes;
+use futures::StreamExt;
+use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use crate::io::{
@@ -220,6 +222,13 @@ impl Storage for MemoryStorage {
Ok(())
}
+ async fn delete_stream(&self, mut paths: BoxStream<'static, String>) ->
Result<()> {
+ while let Some(path) = paths.next().await {
+ self.delete(&path).await?;
+ }
+ Ok(())
+ }
+
fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
}
@@ -594,4 +603,56 @@ mod tests {
assert_eq!(storage.read("/path/to/file").await.unwrap(), content);
assert_eq!(storage.read("path/to/file").await.unwrap(), content);
}
+
+ #[tokio::test]
+ async fn test_memory_storage_delete_stream() {
+ use futures::stream;
+
+ let storage = MemoryStorage::new();
+
+ // Create multiple files
+ storage
+ .write("memory://file1.txt", Bytes::from("1"))
+ .await
+ .unwrap();
+ storage
+ .write("memory://file2.txt", Bytes::from("2"))
+ .await
+ .unwrap();
+ storage
+ .write("memory://file3.txt", Bytes::from("3"))
+ .await
+ .unwrap();
+
+ // Verify files exist
+ assert!(storage.exists("memory://file1.txt").await.unwrap());
+ assert!(storage.exists("memory://file2.txt").await.unwrap());
+ assert!(storage.exists("memory://file3.txt").await.unwrap());
+
+ // Delete multiple files using stream
+ let paths = vec![
+ "memory://file1.txt".to_string(),
+ "memory://file2.txt".to_string(),
+ ];
+ let path_stream = stream::iter(paths).boxed();
+ storage.delete_stream(path_stream).await.unwrap();
+
+ // Verify deleted files no longer exist
+ assert!(!storage.exists("memory://file1.txt").await.unwrap());
+ assert!(!storage.exists("memory://file2.txt").await.unwrap());
+
+ // Verify file3 still exists
+ assert!(storage.exists("memory://file3.txt").await.unwrap());
+ }
+
+ #[tokio::test]
+ async fn test_memory_storage_delete_stream_empty() {
+ use futures::stream;
+
+ let storage = MemoryStorage::new();
+
+ // Delete with empty stream should succeed
+ let path_stream = stream::iter(Vec::<String>::new()).boxed();
+ storage.delete_stream(path_stream).await.unwrap();
+ }
}
diff --git a/crates/iceberg/src/io/storage/mod.rs
b/crates/iceberg/src/io/storage/mod.rs
index 3c7c555a5..5276c7771 100644
--- a/crates/iceberg/src/io/storage/mod.rs
+++ b/crates/iceberg/src/io/storage/mod.rs
@@ -27,6 +27,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
pub use config::*;
+use futures::stream::BoxStream;
pub use local_fs::{LocalFsStorage, LocalFsStorageFactory};
pub use memory::{MemoryStorage, MemoryStorageFactory};
@@ -93,6 +94,9 @@ pub trait Storage: Debug + Send + Sync {
/// Delete all files with the given prefix
async fn delete_prefix(&self, path: &str) -> Result<()>;
+ /// Delete multiple files from a stream of paths.
+ async fn delete_stream(&self, paths: BoxStream<'static, String>) ->
Result<()>;
+
/// Create a new input file for reading
fn new_input(&self, path: &str) -> Result<InputFile>;
diff --git a/crates/storage/opendal/Cargo.toml
b/crates/storage/opendal/Cargo.toml
index e0a3cf8ed..84f7e1147 100644
--- a/crates/storage/opendal/Cargo.toml
+++ b/crates/storage/opendal/Cargo.toml
@@ -49,6 +49,7 @@ reqwest = { workspace = true }
serde = { workspace = true }
typetag = { workspace = true }
url = { workspace = true }
+futures = { workspace = true }
[dev-dependencies]
async-trait = { workspace = true }
diff --git a/crates/storage/opendal/src/azdls.rs
b/crates/storage/opendal/src/azdls.rs
index 70caae7c4..6251f8cda 100644
--- a/crates/storage/opendal/src/azdls.rs
+++ b/crates/storage/opendal/src/azdls.rs
@@ -160,7 +160,7 @@ impl FromStr for AzureStorageScheme {
}
/// Validates whether the given path matches what's configured for the backend.
-fn match_path_with_config(
+pub(crate) fn match_path_with_config(
path: &AzureStoragePath,
config: &AzdlsConfig,
configured_scheme: &AzureStorageScheme,
@@ -220,7 +220,7 @@ fn azdls_config_build(config: &AzdlsConfig, path:
&AzureStoragePath) -> Result<o
/// Represents a fully qualified path to blob/ file in Azure Storage.
#[derive(Debug, PartialEq)]
-struct AzureStoragePath {
+pub(crate) struct AzureStoragePath {
/// The scheme of the URL, e.g., `abfss`, `abfs`, `wasbs`, or `wasb`.
scheme: AzureStorageScheme,
@@ -236,7 +236,7 @@ struct AzureStoragePath {
/// Path to the file.
///
/// It is relative to the `root` of the `AzdlsConfig`.
- path: String,
+ pub(crate) path: String,
}
impl AzureStoragePath {
diff --git a/crates/storage/opendal/src/lib.rs
b/crates/storage/opendal/src/lib.rs
index 1e5043aca..7c11f80ad 100644
--- a/crates/storage/opendal/src/lib.rs
+++ b/crates/storage/opendal/src/lib.rs
@@ -24,11 +24,15 @@
mod utils;
+use std::collections::HashMap;
+use std::collections::hash_map::Entry;
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use cfg_if::cfg_if;
+use futures::StreamExt;
+use futures::stream::BoxStream;
use iceberg::io::{
FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage,
StorageConfig,
StorageFactory,
@@ -336,6 +340,101 @@ impl OpenDalStorage {
let operator = operator.layer(RetryLayer::new());
Ok((operator, relative_path))
}
+
+ /// Extracts the relative path from an absolute path without building an
operator.
+ ///
+ /// This is a lightweight alternative to
[`create_operator`](Self::create_operator) for cases
+ /// where only the relative path is needed (e.g. bulk deletes where the
operator is already
+ /// available).
+ #[allow(unreachable_code, unused_variables)]
+ pub(crate) fn relativize_path<'a>(&self, path: &'a str) -> Result<&'a str>
{
+ match self {
+ #[cfg(feature = "opendal-memory")]
+ OpenDalStorage::Memory(_) =>
Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])),
+ #[cfg(feature = "opendal-fs")]
+ OpenDalStorage::LocalFs =>
Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])),
+ #[cfg(feature = "opendal-s3")]
+ OpenDalStorage::S3 {
+ configured_scheme, ..
+ } => {
+ let url = url::Url::parse(path)?;
+ let bucket = url.host_str().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid s3 url: {path}, missing bucket"),
+ )
+ })?;
+ let prefix = format!("{}://{}/", configured_scheme, bucket);
+ if path.starts_with(&prefix) {
+ Ok(&path[prefix.len()..])
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid s3 url: {path}, should start with
{prefix}"),
+ ))
+ }
+ }
+ #[cfg(feature = "opendal-gcs")]
+ OpenDalStorage::Gcs { .. } => {
+ let url = url::Url::parse(path)?;
+ let bucket = url.host_str().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid gcs url: {path}, missing bucket"),
+ )
+ })?;
+ let prefix = format!("gs://{}/", bucket);
+ if path.starts_with(&prefix) {
+ Ok(&path[prefix.len()..])
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid gcs url: {path}, should start with
{prefix}"),
+ ))
+ }
+ }
+ #[cfg(feature = "opendal-oss")]
+ OpenDalStorage::Oss { .. } => {
+ let url = url::Url::parse(path)?;
+ let bucket = url.host_str().ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid oss url: {path}, missing bucket"),
+ )
+ })?;
+ let prefix = format!("oss://{}/", bucket);
+ if path.starts_with(&prefix) {
+ Ok(&path[prefix.len()..])
+ } else {
+ Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Invalid oss url: {path}, should start with
{prefix}"),
+ ))
+ }
+ }
+ #[cfg(feature = "opendal-azdls")]
+ OpenDalStorage::Azdls {
+ configured_scheme,
+ config,
+ } => {
+ let azure_path = path.parse::<AzureStoragePath>()?;
+ match_path_with_config(&azure_path, config,
configured_scheme)?;
+ let relative_path_len = azure_path.path.len();
+ Ok(&path[path.len() - relative_path_len..])
+ }
+ #[cfg(all(
+ not(feature = "opendal-s3"),
+ not(feature = "opendal-fs"),
+ not(feature = "opendal-gcs"),
+ not(feature = "opendal-oss"),
+ not(feature = "opendal-azdls"),
+ ))]
+ _ => Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "No storage service has been enabled",
+ )),
+ }
+ }
}
#[typetag::serde(name = "OpenDalStorage")]
@@ -400,6 +499,40 @@ impl Storage for OpenDalStorage {
Ok(op.remove_all(&path).await.map_err(from_opendal_error)?)
}
+ async fn delete_stream(&self, mut paths: BoxStream<'static, String>) ->
Result<()> {
+ let mut deleters: HashMap<String, opendal::Deleter> = HashMap::new();
+
+ while let Some(path) = paths.next().await {
+ let bucket = url::Url::parse(&path)
+ .ok()
+ .and_then(|u| u.host_str().map(|s| s.to_string()))
+ .unwrap_or_default();
+
+ let (relative_path, deleter) = match deleters.entry(bucket) {
+ Entry::Occupied(entry) => {
+ (self.relativize_path(&path)?.to_string(),
entry.into_mut())
+ }
+ Entry::Vacant(entry) => {
+ let (op, rel) = self.create_operator(&path)?;
+ let rel = rel.to_string();
+ let deleter =
op.deleter().await.map_err(from_opendal_error)?;
+ (rel, entry.insert(deleter))
+ }
+ };
+
+ deleter
+ .delete(relative_path)
+ .await
+ .map_err(from_opendal_error)?;
+ }
+
+ for (_, mut deleter) in deleters {
+ deleter.close().await.map_err(from_opendal_error)?;
+ }
+
+ Ok(())
+ }
+
#[allow(unreachable_code, unused_variables)]
fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
@@ -457,4 +590,182 @@ mod tests {
let op = default_memory_operator();
assert_eq!(op.info().scheme().to_string(), "memory");
}
+
+ #[cfg(feature = "opendal-memory")]
+ #[test]
+ fn test_relativize_path_memory() {
+ let storage = OpenDalStorage::Memory(default_memory_operator());
+
+ assert_eq!(
+ storage.relativize_path("memory:/path/to/file").unwrap(),
+ "path/to/file"
+ );
+ // Without the scheme prefix, falls back to stripping the leading slash
+ assert_eq!(
+ storage.relativize_path("/path/to/file").unwrap(),
+ "path/to/file"
+ );
+ }
+
+ #[cfg(feature = "opendal-fs")]
+ #[test]
+ fn test_relativize_path_fs() {
+ let storage = OpenDalStorage::LocalFs;
+
+ assert_eq!(
+ storage
+ .relativize_path("file:/tmp/data/file.parquet")
+ .unwrap(),
+ "tmp/data/file.parquet"
+ );
+ assert_eq!(
+ storage.relativize_path("/tmp/data/file.parquet").unwrap(),
+ "tmp/data/file.parquet"
+ );
+ }
+
+ #[cfg(feature = "opendal-s3")]
+ #[test]
+ fn test_relativize_path_s3() {
+ let storage = OpenDalStorage::S3 {
+ configured_scheme: "s3".to_string(),
+ config: Arc::new(S3Config::default()),
+ customized_credential_load: None,
+ };
+
+ assert_eq!(
+ storage
+ .relativize_path("s3://my-bucket/path/to/file.parquet")
+ .unwrap(),
+ "path/to/file.parquet"
+ );
+
+ // s3a scheme
+ let storage_s3a = OpenDalStorage::S3 {
+ configured_scheme: "s3a".to_string(),
+ config: Arc::new(S3Config::default()),
+ customized_credential_load: None,
+ };
+ assert_eq!(
+ storage_s3a
+ .relativize_path("s3a://my-bucket/path/to/file.parquet")
+ .unwrap(),
+ "path/to/file.parquet"
+ );
+ }
+
+ #[cfg(feature = "opendal-s3")]
+ #[test]
+ fn test_relativize_path_s3_scheme_mismatch() {
+ let storage = OpenDalStorage::S3 {
+ configured_scheme: "s3".to_string(),
+ config: Arc::new(S3Config::default()),
+ customized_credential_load: None,
+ };
+
+ // Scheme mismatch should error
+ assert!(
+ storage
+ .relativize_path("s3a://my-bucket/path/to/file.parquet")
+ .is_err()
+ );
+ }
+
+ #[cfg(feature = "opendal-gcs")]
+ #[test]
+ fn test_relativize_path_gcs() {
+ let storage = OpenDalStorage::Gcs {
+ config: Arc::new(GcsConfig::default()),
+ };
+
+ assert_eq!(
+ storage
+ .relativize_path("gs://my-bucket/path/to/file.parquet")
+ .unwrap(),
+ "path/to/file.parquet"
+ );
+ }
+
+ #[cfg(feature = "opendal-gcs")]
+ #[test]
+ fn test_relativize_path_gcs_invalid_scheme() {
+ let storage = OpenDalStorage::Gcs {
+ config: Arc::new(GcsConfig::default()),
+ };
+
+ assert!(
+ storage
+ .relativize_path("s3://my-bucket/path/to/file.parquet")
+ .is_err()
+ );
+ }
+
+ #[cfg(feature = "opendal-oss")]
+ #[test]
+ fn test_relativize_path_oss() {
+ let storage = OpenDalStorage::Oss {
+ config: Arc::new(OssConfig::default()),
+ };
+
+ assert_eq!(
+ storage
+ .relativize_path("oss://my-bucket/path/to/file.parquet")
+ .unwrap(),
+ "path/to/file.parquet"
+ );
+ }
+
+ #[cfg(feature = "opendal-oss")]
+ #[test]
+ fn test_relativize_path_oss_invalid_scheme() {
+ let storage = OpenDalStorage::Oss {
+ config: Arc::new(OssConfig::default()),
+ };
+
+ assert!(
+ storage
+ .relativize_path("s3://my-bucket/path/to/file.parquet")
+ .is_err()
+ );
+ }
+
+ #[cfg(feature = "opendal-azdls")]
+ #[test]
+ fn test_relativize_path_azdls() {
+ let storage = OpenDalStorage::Azdls {
+ configured_scheme: AzureStorageScheme::Abfss,
+ config: Arc::new(AzdlsConfig {
+ account_name: Some("myaccount".to_string()),
+ endpoint:
Some("https://myaccount.dfs.core.windows.net".to_string()),
+ ..Default::default()
+ }),
+ };
+
+ assert_eq!(
+ storage
+
.relativize_path("abfss://[email protected]/path/to/file.parquet")
+ .unwrap(),
+ "/path/to/file.parquet"
+ );
+ }
+
+ #[cfg(feature = "opendal-azdls")]
+ #[test]
+ fn test_relativize_path_azdls_scheme_mismatch() {
+ let storage = OpenDalStorage::Azdls {
+ configured_scheme: AzureStorageScheme::Abfss,
+ config: Arc::new(AzdlsConfig {
+ account_name: Some("myaccount".to_string()),
+ endpoint:
Some("https://myaccount.dfs.core.windows.net".to_string()),
+ ..Default::default()
+ }),
+ };
+
+ // wasbs scheme doesn't match configured abfss
+ assert!(
+ storage
+
.relativize_path("wasbs://[email protected]/path/to/file.parquet")
+ .is_err()
+ );
+ }
}
diff --git a/crates/storage/opendal/tests/file_io_s3_test.rs
b/crates/storage/opendal/tests/file_io_s3_test.rs
index 5801af060..207a4454d 100644
--- a/crates/storage/opendal/tests/file_io_s3_test.rs
+++ b/crates/storage/opendal/tests/file_io_s3_test.rs
@@ -24,6 +24,7 @@ mod tests {
use std::sync::Arc;
use async_trait::async_trait;
+ use futures::StreamExt;
use iceberg::io::{
FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION,
S3_SECRET_ACCESS_KEY,
};
@@ -203,4 +204,46 @@ mod tests {
}
}
}
+
+ #[tokio::test]
+ async fn test_file_io_s3_delete_stream() {
+ let file_io = get_file_io().await;
+
+ // Write multiple files
+ let paths: Vec<String> = (0..5)
+ .map(|i| {
+ format!(
+ "s3://bucket1/{}/file-{i}",
+
normalize_test_name_with_parts!("test_file_io_s3_delete_stream")
+ )
+ })
+ .collect();
+ for path in &paths {
+ let _ = file_io.delete(path).await;
+ file_io
+ .new_output(path)
+ .unwrap()
+ .write("delete-me".into())
+ .await
+ .unwrap();
+ assert!(file_io.exists(path).await.unwrap());
+ }
+
+ // Delete via delete_stream
+ let stream = futures::stream::iter(paths.clone()).boxed();
+ file_io.delete_stream(stream).await.unwrap();
+
+ // Verify all files are gone
+ for path in &paths {
+ assert!(!file_io.exists(path).await.unwrap());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_file_io_s3_delete_stream_empty() {
+ let file_io = get_file_io().await;
+ let stream = futures::stream::empty().boxed();
+ // Should succeed with no-op
+ file_io.delete_stream(stream).await.unwrap();
+ }
}