This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b8cf52  feat(local): explicit close with error checking for 
LocalFileSystem (#676)
9b8cf52 is described below

commit 9b8cf5246d6fade1e425b974ac58259af96748f6
Author: Jonathan Giannuzzi <[email protected]>
AuthorDate: Fri Apr 17 10:49:44 2026 +0100

    feat(local): explicit close with error checking for LocalFileSystem (#676)
---
 Cargo.toml   |   8 ++++-
 src/local.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++--------
 2 files changed, 101 insertions(+), 16 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 54bc161..b92e4d7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -65,6 +65,12 @@ serde_urlencoded = { version = "0.7", optional = true }
 tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", 
"io-util"], optional = true }
 tracing = { version = "0.1", optional = true }
 
+[target.'cfg(target_family="unix")'.dependencies]
+nix = { version = "0.31.1", default-features = false, optional = true }
+
+[target.'cfg(target_family="windows")'.dependencies]
+windows-sys = { version = "0.61.2", default-features = false, features = 
["Win32_Foundation"], optional = true }
+
 [target.'cfg(target_family="unix")'.dev-dependencies]
 nix = { version = "0.31.1", features = ["fs"] }
 
@@ -77,7 +83,7 @@ futures-channel = {version = "0.3", features = ["sink"]}
 default = ["fs"]
 cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", 
"reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", 
"form_urlencoded", "serde_urlencoded", "tokio"]
 azure = ["cloud", "httparse"]
-fs = ["walkdir", "tokio"]
+fs = ["walkdir", "tokio", "nix", "windows-sys"]
 gcp = ["cloud", "rustls-pki-types"]
 aws = ["cloud", "md-5"]
 http = ["cloud"]
diff --git a/src/local.rs b/src/local.rs
index a12a775..2e71ce6 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -137,6 +137,32 @@ impl From<Error> for super::Error {
     }
 }
 
+/// Explicitly close a file, checking for errors that would be silently 
ignored by Rust's `File::drop()`.
+///
+/// On network filesystems (e.g. NFS), `close()` can fail and indicate data 
loss.
+fn close_file(file: File) -> std::result::Result<(), io::Error> {
+    #[cfg(target_family = "unix")]
+    {
+        nix::unistd::close(file).map_err(|e| e.into())
+    }
+    #[cfg(target_family = "windows")]
+    {
+        use std::os::windows::io::IntoRawHandle;
+
+        let handle = file.into_raw_handle();
+        // SAFETY: `handle` is a valid, owned handle obtained from 
`into_raw_handle()`.
+        match unsafe { windows_sys::Win32::Foundation::CloseHandle(handle) } {
+            0 => Err(io::Error::last_os_error()),
+            _ => Ok(()),
+        }
+    }
+    #[cfg(not(any(target_family = "unix", target_family = "windows")))]
+    {
+        drop(file);
+        Ok(())
+    }
+}
+
 /// Local filesystem storage providing an [`ObjectStore`] interface to files on
 /// local disk. Can optionally be created with a directory prefix
 ///
@@ -359,16 +385,17 @@ impl ObjectStore for LocalFileSystem {
                         path: path.to_string_lossy().to_string(),
                     })?;
                     e_tag = Some(get_etag(&metadata));
+                    // Explicitly close the file, checking for errors that 
would be silently ignored by drop.
+                    // On network filesystems (e.g. NFS), close can fail and 
indicate data loss.
+                    //
+                    // This also ensures the file is closed before rename, 
which is required by some FUSE
+                    // filesystems (e.g. Blobfuse) to trigger the upload 
operation.
+                    close_file(file).map_err(|source| 
Error::UnableToCopyDataToFile { source })?;
                     match opts.mode {
-                        PutMode::Overwrite => {
-                            // For some fuse types of file systems, the file 
must be closed first
-                            // to trigger the upload operation, and then 
renamed, such as Blobfuse
-                            std::mem::drop(file);
-                            match std::fs::rename(&staging_path, &path) {
-                                Ok(_) => None,
-                                Err(source) => Some(Error::UnableToRenameFile 
{ source }),
-                            }
-                        }
+                        PutMode::Overwrite => match 
std::fs::rename(&staging_path, &path) {
+                            Ok(_) => None,
+                            Err(source) => Some(Error::UnableToRenameFile { 
source }),
+                        },
                         PutMode::Create => match 
std::fs::hard_link(&staging_path, &path) {
                             Ok(_) => {
                                 let _ = std::fs::remove_file(&staging_path); 
// Attempt to cleanup
@@ -840,7 +867,7 @@ struct LocalUpload {
 #[derive(Debug)]
 struct UploadState {
     dest: PathBuf,
-    file: Mutex<File>,
+    file: Mutex<Option<File>>,
 }
 
 impl LocalUpload {
@@ -848,7 +875,7 @@ impl LocalUpload {
         Self {
             state: Arc::new(UploadState {
                 dest,
-                file: Mutex::new(file),
+                file: Mutex::new(Some(file)),
             }),
             src: Some(src),
             offset: 0,
@@ -864,7 +891,8 @@ impl MultipartUpload for LocalUpload {
 
         let s = Arc::clone(&self.state);
         maybe_spawn_blocking(move || {
-            let mut file = s.file.lock();
+            let mut guard = s.file.lock();
+            let file = guard.as_mut().ok_or(Error::Aborted)?;
             file.seek(SeekFrom::Start(offset)).map_err(|source| {
                 let path = s.dest.clone();
                 Error::Seek { source, path }
@@ -884,14 +912,24 @@ impl MultipartUpload for LocalUpload {
         let s = Arc::clone(&self.state);
         maybe_spawn_blocking(move || {
             // Ensure no inflight writes
-            let file = s.file.lock();
-            std::fs::rename(&src, &s.dest)
-                .map_err(|source| Error::UnableToRenameFile { source })?;
+            let mut guard = s.file.lock();
+            let file = guard.take().ok_or(Error::Aborted)?;
+
             let metadata = file.metadata().map_err(|e| Error::Metadata {
                 source: e.into(),
                 path: src.to_string_lossy().to_string(),
             })?;
 
+            // Explicitly close the file, checking for errors that would be 
silently ignored by drop.
+            // On network filesystems (e.g. NFS), close can fail and indicate 
data loss.
+            //
+            // This also ensures the file is closed before rename, which is 
required by some FUSE
+            // filesystems (e.g. Blobfuse) to trigger the upload operation.
+            close_file(file).map_err(|source| Error::UnableToCopyDataToFile { 
source })?;
+
+            std::fs::rename(&src, &s.dest)
+                .map_err(|source| Error::UnableToRenameFile { source })?;
+
             Ok(PutResult {
                 e_tag: Some(get_etag(&metadata)),
                 version: None,
@@ -1847,6 +1885,47 @@ mod tests {
         integration.delete(&location).await.unwrap();
         assert!(fs::read_dir(root.path()).unwrap().count() == 0);
     }
+
+    #[test]
+    #[cfg(target_family = "unix")]
+    fn test_close_file_detects_error_unix() {
+        use std::os::fd::FromRawFd;
+        use std::os::unix::io::AsRawFd;
+
+        let file = tempfile::tempfile().unwrap();
+
+        // Close and reclaim a File from the now-invalid fd
+        let file = {
+            let fd = file.as_raw_fd();
+            super::close_file(file).unwrap();
+            unsafe { std::fs::File::from_raw_fd(fd) }
+        };
+
+        let err = super::close_file(file).unwrap_err();
+        assert_eq!(err.raw_os_error(), Some(nix::libc::EBADF), "got: {err:?}");
+    }
+
+    #[test]
+    #[cfg(target_family = "windows")]
+    fn test_close_file_detects_error_windows() {
+        use std::os::windows::io::{AsRawHandle, FromRawHandle};
+
+        let file = tempfile::tempfile().unwrap();
+
+        // Close and reclaim a File from the now-invalid handle
+        let file = {
+            let handle = file.as_raw_handle();
+            super::close_file(file).unwrap();
+            unsafe { std::fs::File::from_raw_handle(handle) }
+        };
+
+        let err = super::close_file(file).unwrap_err();
+        assert_eq!(
+            err.raw_os_error(),
+            Some(windows_sys::Win32::Foundation::ERROR_INVALID_HANDLE as i32),
+            "got: {err:?}"
+        );
+    }
 }
 
 #[cfg(not(target_arch = "wasm32"))]

Reply via email to