This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 596e1f6  fix(local): fsync create-mode rename source delete (#758)
596e1f6 is described below

commit 596e1f695bd7d351a0a5d89fff14d1d2b87b853a
Author: Kevin Liu <[email protected]>
AuthorDate: Thu Jun 18 06:36:14 2026 -0400

    fix(local): fsync create-mode rename source delete (#758)
    
    * fix(local): fsync create-mode rename delete
    
    * better
---
 src/local.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 65 insertions(+), 6 deletions(-)

diff --git a/src/local.rs b/src/local.rs
index b81cf7e..1ef18e1 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -44,7 +44,7 @@ use crate::{
     path::{Path, absolute_path_to_url},
     util::InvalidGetRange,
 };
-use crate::{CopyMode, CopyOptions, ObjectStoreExt, RenameOptions, 
RenameTargetMode};
+use crate::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
 
 /// A specialized `Error` for filesystem object store-related errors
 #[derive(Debug, thiserror::Error)]
@@ -524,7 +524,9 @@ impl ObjectStore for LocalFileSystem {
                 let config = Arc::clone(&config);
                 maybe_spawn_blocking(move || {
                     let location = location?;
-                    Self::delete_location(config, automatic_cleanup, 
&location)?;
+                    // `with_fsync` does not apply to standalone deletes; only 
create-mode rename
+                    // fsyncs its internal source removal as part of the 
durable copy-and-delete.
+                    Self::delete_location(config, automatic_cleanup, 
&location, false)?;
                     Ok(location)
                 })
             })
@@ -727,7 +729,14 @@ impl ObjectStore for LocalFileSystem {
                     },
                 )
                 .await?;
-                self.delete(from).await?;
+                let config = Arc::clone(&self.config);
+                let automatic_cleanup = self.automatic_cleanup;
+                let fsync = self.fsync;
+                let from = from.clone();
+                maybe_spawn_blocking(move || {
+                    Self::delete_location(config, automatic_cleanup, &from, 
fsync)
+                })
+                .await?;
                 Ok(())
             }
         }
@@ -739,6 +748,7 @@ impl LocalFileSystem {
         config: Arc<Config>,
         automatic_cleanup: bool,
         location: &Path,
+        fsync: bool,
     ) -> Result<()> {
         let path = config.path_to_filesystem(location)?;
         if let Err(e) = std::fs::remove_file(&path) {
@@ -746,7 +756,18 @@ impl LocalFileSystem {
                 ErrorKind::NotFound => Error::NotFound { path, source: e 
}.into(),
                 _ => Error::UnableToDeleteFile { path, source: e }.into(),
             })
-        } else if automatic_cleanup {
+        } else {
+            if fsync {
+                fsync_parent_dir(&path).map_err(|source| 
Error::UnableToSyncFile {
+                    source,
+                    path: path.clone(),
+                })?;
+            }
+
+            if !automatic_cleanup {
+                return Ok(());
+            }
+
             let root = &config.root;
             let root = root
                 .to_file_path()
@@ -763,8 +784,6 @@ impl LocalFileSystem {
                 }
             }
 
-            Ok(())
-        } else {
             Ok(())
         }
     }
@@ -1481,6 +1500,9 @@ mod tests {
     use futures_util::TryStreamExt;
     use tempfile::TempDir;
 
+    #[cfg(target_family = "unix")]
+    use std::os::unix::fs::PermissionsExt;
+
     #[cfg(target_family = "unix")]
     use tempfile::NamedTempFile;
 
@@ -1570,6 +1592,43 @@ mod tests {
         assert_eq!(read, data);
     }
 
+    #[tokio::test]
+    #[cfg(target_family = "unix")]
+    async fn fsync_rename_if_not_exists_propagates_source_delete_sync_error() {
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path())
+            .unwrap()
+            .with_fsync(true);
+
+        let source = Path::from("source_dir/source_file");
+        let dest = Path::from("dest_dir/dest_file");
+        integration.put(&source, "data".into()).await.unwrap();
+
+        let source_dir = root.path().join("source_dir");
+        let original_permissions = 
fs::metadata(&source_dir).unwrap().permissions();
+        // Allow unlinking the file from the directory, but prevent opening the
+        // directory for fsync. Without the source-parent fsync, rename 
succeeds;
+        // with it, the sync error is propagated after the source is deleted.
+        fs::set_permissions(&source_dir, 
fs::Permissions::from_mode(0o300)).unwrap();
+
+        let result = integration.rename_if_not_exists(&source, &dest).await;
+        fs::set_permissions(&source_dir, original_permissions).unwrap();
+
+        match result {
+            Err(crate::Error::Generic { source, .. }) => {
+                assert!(source.to_string().contains("Unable to sync data to 
disk"));
+            }
+            _ => panic!("expected source parent fsync to fail"),
+        }
+
+        let read = 
integration.get(&dest).await.unwrap().bytes().await.unwrap();
+        assert_eq!(read, Bytes::from("data"));
+        assert!(matches!(
+            integration.get(&source).await.unwrap_err(),
+            crate::Error::NotFound { .. }
+        ));
+    }
+
     #[test]
     #[cfg(target_family = "unix")]
     fn test_non_tokio() {

Reply via email to