This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 596e1f6 fix(local): fsync create-mode rename source delete (#758)
596e1f6 is described below
commit 596e1f695bd7d351a0a5d89fff14d1d2b87b853a
Author: Kevin Liu <[email protected]>
AuthorDate: Thu Jun 18 06:36:14 2026 -0400
fix(local): fsync create-mode rename source delete (#758)
* fix(local): fsync create-mode rename delete
* better
---
src/local.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 65 insertions(+), 6 deletions(-)
diff --git a/src/local.rs b/src/local.rs
index b81cf7e..1ef18e1 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -44,7 +44,7 @@ use crate::{
path::{Path, absolute_path_to_url},
util::InvalidGetRange,
};
-use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions,
RenameTargetMode};
+use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
/// A specialized `Error` for filesystem object store-related errors
#[derive(Debug, thiserror::Error)]
@@ -524,7 +524,9 @@ impl ObjectStore for LocalFileSystem {
let config = Arc::clone(&config);
maybe_spawn_blocking(move || {
let location = location?;
- Self::delete_location(config, automatic_cleanup,
&location)?;
+ // `with_fsync` does not apply to standalone deletes; only
create-mode rename
+ // fsyncs its internal source removal as part of the
durable copy-and-delete.
+ Self::delete_location(config, automatic_cleanup,
&location, false)?;
Ok(location)
})
})
@@ -727,7 +729,14 @@ impl ObjectStore for LocalFileSystem {
},
)
.await?;
- self.delete(from).await?;
+ let config = Arc::clone(&self.config);
+ let automatic_cleanup = self.automatic_cleanup;
+ let fsync = self.fsync;
+ let from = from.clone();
+ maybe_spawn_blocking(move || {
+ Self::delete_location(config, automatic_cleanup, &from,
fsync)
+ })
+ .await?;
Ok(())
}
}
@@ -739,6 +748,7 @@ impl LocalFileSystem {
config: Arc<Config>,
automatic_cleanup: bool,
location: &Path,
+ fsync: bool,
) -> Result<()> {
let path = config.path_to_filesystem(location)?;
if let Err(e) = std::fs::remove_file(&path) {
@@ -746,7 +756,18 @@ impl LocalFileSystem {
ErrorKind::NotFound => Error::NotFound { path, source: e
}.into(),
_ => Error::UnableToDeleteFile { path, source: e }.into(),
})
- } else if automatic_cleanup {
+ } else {
+ if fsync {
+ fsync_parent_dir(&path).map_err(|source|
Error::UnableToSyncFile {
+ source,
+ path: path.clone(),
+ })?;
+ }
+
+ if !automatic_cleanup {
+ return Ok(());
+ }
+
let root = &config.root;
let root = root
.to_file_path()
@@ -763,8 +784,6 @@ impl LocalFileSystem {
}
}
- Ok(())
- } else {
Ok(())
}
}
@@ -1481,6 +1500,9 @@ mod tests {
use futures_util::TryStreamExt;
use tempfile::TempDir;
+ #[cfg(target_family = "unix")]
+ use std::os::unix::fs::PermissionsExt;
+
#[cfg(target_family = "unix")]
use tempfile::NamedTempFile;
@@ -1570,6 +1592,43 @@ mod tests {
assert_eq!(read, data);
}
+ #[tokio::test]
+ #[cfg(target_family = "unix")]
+ async fn fsync_rename_if_not_exists_propagates_source_delete_sync_error() {
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path())
+ .unwrap()
+ .with_fsync(true);
+
+ let source = Path::from("source_dir/source_file");
+ let dest = Path::from("dest_dir/dest_file");
+ integration.put(&source, "data".into()).await.unwrap();
+
+ let source_dir = root.path().join("source_dir");
+ let original_permissions =
fs::metadata(&source_dir).unwrap().permissions();
+ // Allow unlinking the file from the directory, but prevent opening the
+ // directory for fsync. Without the source-parent fsync, rename
succeeds;
+ // with it, the sync error is propagated after the source is deleted.
+ fs::set_permissions(&source_dir,
fs::Permissions::from_mode(0o300)).unwrap();
+
+ let result = integration.rename_if_not_exists(&source, &dest).await;
+ fs::set_permissions(&source_dir, original_permissions).unwrap();
+
+ match result {
+ Err(crate::Error::Generic { source, .. }) => {
+ assert!(source.to_string().contains("Unable to sync data to
disk"));
+ }
+ _ => panic!("expected source parent fsync to fail"),
+ }
+
+ let read =
integration.get(&dest).await.unwrap().bytes().await.unwrap();
+ assert_eq!(read, Bytes::from("data"));
+ assert!(matches!(
+ integration.get(&source).await.unwrap_err(),
+ crate::Error::NotFound { .. }
+ ));
+ }
+
#[test]
#[cfg(target_family = "unix")]
fn test_non_tokio() {