This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new dd0afcb feat(local): add fsync to LocalFileSystem for durability
(#643)
dd0afcb is described below
commit dd0afcbc75d22a76f871101125bcc8782f6ef85f
Author: Pierre Barre <[email protected]>
AuthorDate: Wed Jun 17 13:48:52 2026 +0200
feat(local): add fsync to LocalFileSystem for durability (#643)
* feat(local): add opt-in fsync to LocalFileSystem for durability
* Fix docs
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
src/local.rs | 339 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
1 file changed, 287 insertions(+), 52 deletions(-)
diff --git a/src/local.rs b/src/local.rs
index b3f3f7f..a627e2d 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -114,6 +114,9 @@ pub(crate) enum Error {
#[error("Filenames containing trailing '/#\\d+/' are not supported: {}",
path)]
InvalidPath { path: String },
+ #[error("Unable to sync data to disk for {}: {}", path.display(), source)]
+ UnableToSyncFile { source: io::Error, path: PathBuf },
+
#[error("Upload aborted")]
Aborted,
}
@@ -228,6 +231,8 @@ pub struct LocalFileSystem {
config: Arc<Config>,
// if you want to delete empty directories when deleting files
automatic_cleanup: bool,
+ // if true, fsync written files and their parent directories after writes
+ fsync: bool,
}
#[derive(Debug)]
@@ -255,6 +260,7 @@ impl LocalFileSystem {
root: Url::parse("file:///").unwrap(),
}),
automatic_cleanup: false,
+ fsync: false,
}
}
@@ -273,6 +279,7 @@ impl LocalFileSystem {
root: absolute_path_to_url(path)?,
}),
automatic_cleanup: false,
+ fsync: false,
})
}
@@ -286,6 +293,25 @@ impl LocalFileSystem {
self.automatic_cleanup = automatic_cleanup;
self
}
+
+ /// Enable `fsync` after writes for durability
+ ///
+ /// When enabled, [`LocalFileSystem`] calls [`File::sync_all`] on written
files and fsyncs
+ /// the affected parent directories before a write operation
+ /// ([`put_opts`](ObjectStore::put_opts),
[`copy_opts`](ObjectStore::copy_opts),
+ /// [`rename_opts`](ObjectStore::rename_opts), and multipart upload
completion) returns
+ /// success. This guarantees that both the file contents and the directory
entries pointing
+ /// to them are durable on stable storage, matching the implicit
durability contract of
+ /// remote object stores such as S3 or GCS.
+ ///
+ /// This trades write throughput for durability and is **disabled by
default**.
+ ///
+ /// Note that directory fsync is only performed on Unix; on other
platforms (e.g. Windows)
+ /// it is a no-op, as directories cannot be portably opened and synced.
+ pub fn with_fsync(mut self, fsync: bool) -> Self {
+ self.fsync = fsync;
+ self
+ }
}
impl Config {
@@ -374,8 +400,9 @@ impl ObjectStore for LocalFileSystem {
}
let path = self.path_to_filesystem(location)?;
+ let fsync = self.fsync;
maybe_spawn_blocking(move || {
- let (mut file, staging_path) = new_staged_upload(&path)?;
+ let (mut file, staging_path) = new_staged_upload(&path, fsync)?;
let mut e_tag = None;
let err = match payload.iter().try_for_each(|x| file.write_all(x))
{
@@ -385,39 +412,26 @@ impl ObjectStore for LocalFileSystem {
path: path.to_string_lossy().to_string(),
})?;
e_tag = Some(get_etag(&metadata));
- // Explicitly close the file, checking for errors that
would be silently ignored by drop.
- // On network filesystems (e.g. NFS), close can fail and
indicate data loss.
- //
- // This also ensures the file is closed before rename,
which is required by some FUSE
- // filesystems (e.g. Blobfuse) to trigger the upload
operation.
- close_file(file).map_err(|source|
Error::UnableToCopyDataToFile { source })?;
+ // Atomically publish the staged file. When fsync is
enabled the publish
+ // helpers flush the file's contents and the destination's
parent directory to
+ // disk first, so a successful return is durable; the
fsync calls are bundled
+ // into the helpers so a file-system modification can
never be left unsynced.
match opts.mode {
- PutMode::Overwrite => match
std::fs::rename(&staging_path, &path) {
- Ok(_) => None,
- Err(source) => Some(Error::UnableToRenameFile {
source }),
- },
- PutMode::Create => match
std::fs::hard_link(&staging_path, &path) {
- Ok(_) => {
- let _ = std::fs::remove_file(&staging_path);
// Attempt to cleanup
- None
- }
- Err(source) => match source.kind() {
- ErrorKind::AlreadyExists =>
Some(Error::AlreadyExists {
- path: path.to_str().unwrap().to_string(),
- source,
- }),
- _ => Some(Error::UnableToRenameFile { source
}),
- },
- },
+ PutMode::Overwrite => {
+ finish_staged_rename(file, &staging_path, &path,
fsync).err()
+ }
+ PutMode::Create => {
+ finish_staged_hard_link(file, &staging_path,
&path, fsync).err()
+ }
PutMode::Update(_) => unreachable!(),
}
}
- Err(source) => Some(Error::UnableToCopyDataToFile { source }),
+ Err(source) => Some(Error::UnableToCopyDataToFile { source
}.into()),
};
if let Some(err) = err {
let _ = std::fs::remove_file(&staging_path); // Attempt to
cleanup
- return Err(err.into());
+ return Err(err);
}
Ok(PutResult {
@@ -442,8 +456,8 @@ impl ObjectStore for LocalFileSystem {
}
let dest = self.path_to_filesystem(location)?;
- let (file, src) = new_staged_upload(&dest)?;
- Ok(Box::new(LocalUpload::new(src, dest, file)))
+ let (file, src) = new_staged_upload(&dest, self.fsync)?;
+ Ok(Box::new(LocalUpload::new(src, dest, file, self.fsync)))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
@@ -579,6 +593,7 @@ impl ObjectStore for LocalFileSystem {
let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
+ let fsync = self.fsync;
match mode {
CopyMode::Overwrite => {
@@ -592,17 +607,22 @@ impl ObjectStore for LocalFileSystem {
maybe_spawn_blocking(move || {
loop {
let staged = staged_upload_path(&to, &id.to_string());
+ // Stage via a temporary hard link; the source is
already durable so the
+ // staging link itself needs no fsync (the publish
rename below fsyncs the
+ // shared parent directory).
match std::fs::hard_link(&from, &staged) {
- Ok(_) => {
- return std::fs::rename(&staged,
&to).map_err(|source| {
+ // `rename` bundles in the fsync of `to`'s parent
directory.
+ Ok(_) => match rename(&staged, &to, fsync) {
+ Ok(_) => return Ok(()),
+ Err(source) => {
let _ = std::fs::remove_file(&staged); //
Attempt to clean up
- Error::UnableToCopyFile { from, to, source
}.into()
- });
- }
+ return Err(Error::UnableToCopyFile { from,
to, source }.into());
+ }
+ },
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
ErrorKind::NotFound => match from.exists() {
- true => create_parent_dirs(&to, source)?,
+ true => create_parent_dirs(&to, source,
fsync)?,
false => {
return Err(Error::NotFound { path:
from, source }.into());
}
@@ -619,7 +639,9 @@ impl ObjectStore for LocalFileSystem {
CopyMode::Create => {
maybe_spawn_blocking(move || {
loop {
- match std::fs::hard_link(&from, &to) {
+ // The source is an existing object that is already
durable, so no file
+ // sync is needed; `hard_link` bundles in the fsync of
`to`'s parent dir.
+ match hard_link(&from, &to, fsync) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => {
@@ -630,7 +652,7 @@ impl ObjectStore for LocalFileSystem {
.into());
}
ErrorKind::NotFound => match from.exists() {
- true => create_parent_dirs(&to, source)?,
+ true => create_parent_dirs(&to, source,
fsync)?,
false => {
return Err(Error::NotFound { path:
from, source }.into());
}
@@ -658,13 +680,18 @@ impl ObjectStore for LocalFileSystem {
RenameTargetMode::Overwrite => {
let from = self.path_to_filesystem(from)?;
let to = self.path_to_filesystem(to)?;
+ let fsync = self.fsync;
maybe_spawn_blocking(move || {
loop {
- match std::fs::rename(&from, &to) {
+ // Unlike multipart `complete`, there is no freshly
written file to
+ // `sync_all` here: `from` is an existing,
already-durable object and a
+ // rename only mutates directory entries. `rename`
bundles in the fsync of
+ // both affected directories (destination, and source
if it differs).
+ match rename(&from, &to, fsync) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => match from.exists() {
- true => create_parent_dirs(&to, source)?,
+ true => create_parent_dirs(&to, source,
fsync)?,
false => {
return Err(Error::NotFound { path:
from, source }.into());
}
@@ -816,23 +843,168 @@ impl LocalFileSystem {
}
/// Creates the parent directories of `path` or returns an error based on
`source` if no parent
-fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()>
{
+///
+/// When `fsync` is true, every directory created here is fsynced, up to and
including the first
+/// pre-existing ancestor (whose entry list also changed), so the new
directory entries are durable.
+fn create_parent_dirs(path: &std::path::Path, source: io::Error, fsync: bool)
-> Result<()> {
let parent = path.parent().ok_or_else(|| {
let path = path.to_path_buf();
Error::UnableToCreateFile { path, source }
})?;
+ // Record the deepest already-existing ancestor *before* creating any
directories, so that
+ // afterwards we know exactly which directories are new and need to be
fsynced.
+ let first_existing = fsync.then(|| {
+ let mut dir = parent;
+ while !dir.exists() {
+ match dir.parent() {
+ Some(p) => dir = p,
+ None => break,
+ }
+ }
+ dir.to_path_buf()
+ });
+
std::fs::create_dir_all(parent).map_err(|source| {
let path = parent.into();
Error::UnableToCreateDir { source, path }
})?;
+
+ if let Some(first_existing) = first_existing {
+ // Walk from `parent` up to `first_existing`, fsyncing each directory
whose entries changed.
+ let mut dir = parent;
+ loop {
+ fsync_dir(dir).map_err(|source| Error::UnableToSyncFile {
+ source,
+ path: dir.into(),
+ })?;
+ if dir == first_existing {
+ break;
+ }
+ dir = match dir.parent() {
+ Some(p) => p,
+ None => break,
+ };
+ }
+ }
+ Ok(())
+}
+
+/// Renames `from` to `to`, then — when `fsync` is enabled — fsyncs the
destination's parent
+/// directory (and the source's too, if it differs) so the moved directory
entries are durable.
+///
+/// The directory fsync is bundled in deliberately: every durable rename goes
through here, so the
+/// post-rename fsync can never be forgotten at an individual call site.
+fn rename(from: &std::path::Path, to: &std::path::Path, fsync: bool) ->
io::Result<()> {
+ std::fs::rename(from, to)?;
+ if fsync {
+ fsync_parent_dir(to)?;
+ // A cross-directory move also removes an entry from the source
directory.
+ if from.parent() != to.parent() {
+ fsync_parent_dir(from)?;
+ }
+ }
+ Ok(())
+}
+
+/// Hard-links `original` to `link`, then — when `fsync` is enabled — fsyncs
`link`'s parent
+/// directory so the new directory entry is durable.
+///
+/// As with [`rename`], the directory fsync is bundled in so it cannot be
forgotten at a call site.
+fn hard_link(original: &std::path::Path, link: &std::path::Path, fsync: bool)
-> io::Result<()> {
+ std::fs::hard_link(original, link)?;
+ if fsync {
+ fsync_parent_dir(link)?;
+ }
+ Ok(())
+}
+
+/// Durably publishes the freshly-written staging file `file` (located at
`src`) to `dest` via a
+/// rename.
+///
+/// When `fsync` is enabled, the file's contents are flushed before — and
`dest`'s parent
+/// directory after — the rename, so a successful return is durable. The file
is always closed
+/// before the rename (checking for close errors): required for NFS error
detection and for some
+/// FUSE filesystems (e.g. Blobfuse) that only commit the data on close.
+fn finish_staged_rename(
+ file: File,
+ src: &std::path::Path,
+ dest: &std::path::Path,
+ fsync: bool,
+) -> Result<()> {
+ sync_and_close(file, src, fsync)?;
+ rename(src, dest, fsync).map_err(|source| Error::UnableToRenameFile {
source })?;
+ Ok(())
+}
+
+/// Like [`finish_staged_rename`] but publishes via a hard link
(`PutMode::Create` semantics): the
+/// staging file is linked to `dest` and then removed. Returns
[`Error::AlreadyExists`] if `dest`
+/// already exists.
+fn finish_staged_hard_link(
+ file: File,
+ src: &std::path::Path,
+ dest: &std::path::Path,
+ fsync: bool,
+) -> Result<()> {
+ sync_and_close(file, src, fsync)?;
+ match hard_link(src, dest, fsync) {
+ Ok(()) => {
+ let _ = std::fs::remove_file(src); // Attempt to cleanup
+ Ok(())
+ }
+ Err(source) => match source.kind() {
+ ErrorKind::AlreadyExists => Err(Error::AlreadyExists {
+ path: dest.to_str().unwrap().to_string(),
+ source,
+ }
+ .into()),
+ _ => Err(Error::UnableToRenameFile { source }.into()),
+ },
+ }
+}
+
+/// Flushes the freshly-written `file`'s contents to disk (when `fsync` is
enabled) and then
+/// closes it, checking for close errors that dropping the [`File`] would
silently ignore.
+fn sync_and_close(file: File, path: &std::path::Path, fsync: bool) ->
Result<()> {
+ if fsync {
+ file.sync_all().map_err(|source| Error::UnableToSyncFile {
+ source,
+ path: path.into(),
+ })?;
+ }
+ close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source
})?;
Ok(())
}
+/// Fsyncs the parent directory of `path` so a change to its directory entries
(e.g. one just made
+/// by a rename or hard link) is durable. A no-op when `path` has no parent.
+fn fsync_parent_dir(path: &std::path::Path) -> io::Result<()> {
+ match path.parent() {
+ Some(parent) => fsync_dir(parent),
+ None => Ok(()),
+ }
+}
+
+/// Fsyncs `dir_path` so that changes to its directory entries are durable.
+///
+/// This is only meaningful on Unix; on other platforms (e.g. Windows)
directories cannot be
+/// portably opened as a [`File`] and synced, so this is a no-op.
+fn fsync_dir(dir_path: &std::path::Path) -> io::Result<()> {
+ #[cfg(target_family = "unix")]
+ {
+ File::open(dir_path)?.sync_all()
+ }
+ #[cfg(not(target_family = "unix"))]
+ {
+ let _ = dir_path;
+ Ok(())
+ }
+}
+
/// Generates a unique file path `{base}#{suffix}`, returning the opened
`File` and `path`
///
-/// Creates any directories if necessary
-fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
+/// Creates any directories if necessary, fsyncing them when `fsync` is enabled
+fn new_staged_upload(base: &std::path::Path, fsync: bool) -> Result<(File,
PathBuf)> {
let mut multipart_id = 1;
loop {
let suffix = multipart_id.to_string();
@@ -842,7 +1014,7 @@ fn new_staged_upload(base: &std::path::Path) ->
Result<(File, PathBuf)> {
Ok(f) => return Ok((f, path)),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
- ErrorKind::NotFound => create_parent_dirs(&path, source)?,
+ ErrorKind::NotFound => create_parent_dirs(&path, source,
fsync)?,
_ => return Err(Error::UnableToOpenFile { source, path
}.into()),
},
}
@@ -865,6 +1037,8 @@ struct LocalUpload {
src: Option<PathBuf>,
/// The next offset to write into the file
offset: u64,
+ /// Whether to fsync the file and its parent directory on completion
+ fsync: bool,
}
#[derive(Debug)]
@@ -874,7 +1048,7 @@ struct UploadState {
}
impl LocalUpload {
- pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
+ pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, fsync: bool) ->
Self {
Self {
state: Arc::new(UploadState {
dest,
@@ -882,6 +1056,7 @@ impl LocalUpload {
}),
src: Some(src),
offset: 0,
+ fsync,
}
}
}
@@ -913,6 +1088,7 @@ impl MultipartUpload for LocalUpload {
async fn complete(&mut self) -> Result<PutResult> {
let src = self.src.take().ok_or(Error::Aborted)?;
let s = Arc::clone(&self.state);
+ let fsync = self.fsync;
maybe_spawn_blocking(move || {
// Ensure no inflight writes
let mut guard = s.file.lock();
@@ -923,15 +1099,10 @@ impl MultipartUpload for LocalUpload {
path: src.to_string_lossy().to_string(),
})?;
- // Explicitly close the file, checking for errors that would be
silently ignored by drop.
- // On network filesystems (e.g. NFS), close can fail and indicate
data loss.
- //
- // This also ensures the file is closed before rename, which is
required by some FUSE
- // filesystems (e.g. Blobfuse) to trigger the upload operation.
- close_file(file).map_err(|source| Error::UnableToCopyDataToFile {
source })?;
-
- std::fs::rename(&src, &s.dest)
- .map_err(|source| Error::UnableToRenameFile { source })?;
+ // Durably publish the freshly-written staging file: flush its
contents, close it, then
+ // rename it into place and fsync the destination's parent
directory (the fsync calls
+ // are bundled into the helper and only run when fsync is enabled).
+ finish_staged_rename(file, &src, &s.dest, fsync)?;
Ok(PutResult {
e_tag: Some(get_etag(&metadata)),
@@ -1324,6 +1495,70 @@ mod tests {
put_opts(&integration, false).await;
}
+ #[tokio::test]
+ #[cfg(target_family = "unix")]
+ async fn file_test_fsync() {
+ // Run the full integration suite with fsync enabled to ensure the
durability code
+ // paths (file sync + directory fsync on put/copy/rename/multipart,
including recursive
+ // directory creation) behave identically to the default.
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path())
+ .unwrap()
+ .with_fsync(true);
+
+ put_get_delete_list(&integration).await;
+ list_with_offset_exclusivity(&integration).await;
+ get_opts(&integration).await;
+ list_uses_directories_correctly(&integration).await;
+ list_with_delimiter(&integration).await;
+ rename_and_copy(&integration).await;
+ copy_if_not_exists(&integration).await;
+ copy_rename_nonexistent_object(&integration).await;
+ stream_get(&integration).await;
+ put_opts(&integration, false).await;
+ }
+
+ #[tokio::test]
+ async fn fsync_creates_nested_dirs() {
+ // Exercises the recursive directory fsync in `create_parent_dirs`:
every directory
+ // component is newly created, so each must be synced up to the
pre-existing root.
+ let root = TempDir::new().unwrap();
+ let integration = LocalFileSystem::new_with_prefix(root.path())
+ .unwrap()
+ .with_fsync(true);
+
+ let data = Bytes::from("arbitrary data");
+
+ // `put` (overwrite) into a deeply nested, non-existent directory tree
+ let location = Path::from("a/b/c/d/put_file");
+ integration
+ .put(&location, data.clone().into())
+ .await
+ .unwrap();
+ let read = integration
+ .get(&location)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ assert_eq!(read, data);
+
+ // multipart upload into another nested tree
+ let location = Path::from("e/f/g/multipart_file");
+ let mut upload = integration.put_multipart(&location).await.unwrap();
+ upload.put_part(data.clone().into()).await.unwrap();
+ upload.complete().await.unwrap();
+ let read = integration
+ .get(&location)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+ assert_eq!(read, data);
+ }
+
#[test]
#[cfg(target_family = "unix")]
fn test_non_tokio() {