This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 9b8cf52 feat(local): explicit close with error checking for
LocalFileSystem (#676)
9b8cf52 is described below
commit 9b8cf5246d6fade1e425b974ac58259af96748f6
Author: Jonathan Giannuzzi <[email protected]>
AuthorDate: Fri Apr 17 10:49:44 2026 +0100
feat(local): explicit close with error checking for LocalFileSystem (#676)
---
Cargo.toml | 8 ++++-
src/local.rs | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++--------
2 files changed, 101 insertions(+), 16 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 54bc161..b92e4d7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -65,6 +65,12 @@ serde_urlencoded = { version = "0.7", optional = true }
tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time",
"io-util"], optional = true }
tracing = { version = "0.1", optional = true }
+[target.'cfg(target_family="unix")'.dependencies]
+nix = { version = "0.31.1", default-features = false, optional = true }
+
+[target.'cfg(target_family="windows")'.dependencies]
+windows-sys = { version = "0.61.2", default-features = false, features =
["Win32_Foundation"], optional = true }
+
[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.31.1", features = ["fs"] }
@@ -77,7 +83,7 @@ futures-channel = {version = "0.3", features = ["sink"]}
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest",
"reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util",
"form_urlencoded", "serde_urlencoded", "tokio"]
azure = ["cloud", "httparse"]
-fs = ["walkdir", "tokio"]
+fs = ["walkdir", "tokio", "nix", "windows-sys"]
gcp = ["cloud", "rustls-pki-types"]
aws = ["cloud", "md-5"]
http = ["cloud"]
diff --git a/src/local.rs b/src/local.rs
index a12a775..2e71ce6 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -137,6 +137,32 @@ impl From<Error> for super::Error {
}
}
+/// Explicitly close a file, checking for errors that would be silently
ignored by Rust's `File::drop()`.
+///
+/// On network filesystems (e.g. NFS), `close()` can fail and indicate data
loss.
+fn close_file(file: File) -> std::result::Result<(), io::Error> {
+ #[cfg(target_family = "unix")]
+ {
+ nix::unistd::close(file).map_err(|e| e.into())
+ }
+ #[cfg(target_family = "windows")]
+ {
+ use std::os::windows::io::IntoRawHandle;
+
+ let handle = file.into_raw_handle();
+ // SAFETY: `handle` is a valid, owned handle obtained from
`into_raw_handle()`.
+ match unsafe { windows_sys::Win32::Foundation::CloseHandle(handle) } {
+ 0 => Err(io::Error::last_os_error()),
+ _ => Ok(()),
+ }
+ }
+ #[cfg(not(any(target_family = "unix", target_family = "windows")))]
+ {
+ drop(file);
+ Ok(())
+ }
+}
+
/// Local filesystem storage providing an [`ObjectStore`] interface to files on
/// local disk. Can optionally be created with a directory prefix
///
@@ -359,16 +385,17 @@ impl ObjectStore for LocalFileSystem {
path: path.to_string_lossy().to_string(),
})?;
e_tag = Some(get_etag(&metadata));
+ // Explicitly close the file, checking for errors that
would be silently ignored by drop.
+ // On network filesystems (e.g. NFS), close can fail and
indicate data loss.
+ //
+ // This also ensures the file is closed before rename,
which is required by some FUSE
+ // filesystems (e.g. Blobfuse) to trigger the upload
operation.
+ close_file(file).map_err(|source|
Error::UnableToCopyDataToFile { source })?;
match opts.mode {
- PutMode::Overwrite => {
- // For some fuse types of file systems, the file
must be closed first
- // to trigger the upload operation, and then
renamed, such as Blobfuse
- std::mem::drop(file);
- match std::fs::rename(&staging_path, &path) {
- Ok(_) => None,
- Err(source) => Some(Error::UnableToRenameFile
{ source }),
- }
- }
+ PutMode::Overwrite => match
std::fs::rename(&staging_path, &path) {
+ Ok(_) => None,
+ Err(source) => Some(Error::UnableToRenameFile {
source }),
+ },
PutMode::Create => match
std::fs::hard_link(&staging_path, &path) {
Ok(_) => {
let _ = std::fs::remove_file(&staging_path);
// Attempt to cleanup
@@ -840,7 +867,7 @@ struct LocalUpload {
#[derive(Debug)]
struct UploadState {
dest: PathBuf,
- file: Mutex<File>,
+ file: Mutex<Option<File>>,
}
impl LocalUpload {
@@ -848,7 +875,7 @@ impl LocalUpload {
Self {
state: Arc::new(UploadState {
dest,
- file: Mutex::new(file),
+ file: Mutex::new(Some(file)),
}),
src: Some(src),
offset: 0,
@@ -864,7 +891,8 @@ impl MultipartUpload for LocalUpload {
let s = Arc::clone(&self.state);
maybe_spawn_blocking(move || {
- let mut file = s.file.lock();
+ let mut guard = s.file.lock();
+ let file = guard.as_mut().ok_or(Error::Aborted)?;
file.seek(SeekFrom::Start(offset)).map_err(|source| {
let path = s.dest.clone();
Error::Seek { source, path }
@@ -884,14 +912,24 @@ impl MultipartUpload for LocalUpload {
let s = Arc::clone(&self.state);
maybe_spawn_blocking(move || {
// Ensure no inflight writes
- let file = s.file.lock();
- std::fs::rename(&src, &s.dest)
- .map_err(|source| Error::UnableToRenameFile { source })?;
+ let mut guard = s.file.lock();
+ let file = guard.take().ok_or(Error::Aborted)?;
+
let metadata = file.metadata().map_err(|e| Error::Metadata {
source: e.into(),
path: src.to_string_lossy().to_string(),
})?;
+ // Explicitly close the file, checking for errors that would be
silently ignored by drop.
+ // On network filesystems (e.g. NFS), close can fail and indicate
data loss.
+ //
+ // This also ensures the file is closed before rename, which is
required by some FUSE
+ // filesystems (e.g. Blobfuse) to trigger the upload operation.
+ close_file(file).map_err(|source| Error::UnableToCopyDataToFile {
source })?;
+
+ std::fs::rename(&src, &s.dest)
+ .map_err(|source| Error::UnableToRenameFile { source })?;
+
Ok(PutResult {
e_tag: Some(get_etag(&metadata)),
version: None,
@@ -1847,6 +1885,47 @@ mod tests {
integration.delete(&location).await.unwrap();
assert!(fs::read_dir(root.path()).unwrap().count() == 0);
}
+
+ #[test]
+ #[cfg(target_family = "unix")]
+ fn test_close_file_detects_error_unix() {
+ use std::os::fd::FromRawFd;
+ use std::os::unix::io::AsRawFd;
+
+ let file = tempfile::tempfile().unwrap();
+
+ // Close and reclaim a File from the now-invalid fd
+ let file = {
+ let fd = file.as_raw_fd();
+ super::close_file(file).unwrap();
+ unsafe { std::fs::File::from_raw_fd(fd) }
+ };
+
+ let err = super::close_file(file).unwrap_err();
+ assert_eq!(err.raw_os_error(), Some(nix::libc::EBADF), "got: {err:?}");
+ }
+
+ #[test]
+ #[cfg(target_family = "windows")]
+ fn test_close_file_detects_error_windows() {
+ use std::os::windows::io::{AsRawHandle, FromRawHandle};
+
+ let file = tempfile::tempfile().unwrap();
+
+ // Close and reclaim a File from the now-invalid handle
+ let file = {
+ let handle = file.as_raw_handle();
+ super::close_file(file).unwrap();
+ unsafe { std::fs::File::from_raw_handle(handle) }
+ };
+
+ let err = super::close_file(file).unwrap_err();
+ assert_eq!(
+ err.raw_os_error(),
+ Some(windows_sys::Win32::Foundation::ERROR_INVALID_HANDLE as i32),
+ "got: {err:?}"
+ );
+ }
}
#[cfg(not(target_arch = "wasm32"))]