This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f6729673 Make LocalFileSystem::put atomic (#3780) (#3781)
4f6729673 is described below

commit 4f6729673b9c97e3daa8983ef52c51033b72a741
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 2 16:57:52 2023 +0000

    Make LocalFileSystem::put atomic (#3780) (#3781)
    
    * Make LocalFileSystem::put atomic (#3780)
    
    * Clippy
    
    * Add list test
---
 object_store/src/lib.rs   |   6 +++
 object_store/src/local.rs | 106 ++++++++++++++++++++++------------------------
 2 files changed, 56 insertions(+), 56 deletions(-)

diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 6a3275bb0..671b22d0f 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -840,6 +840,12 @@ mod tests {
             crate::Error::NotFound { .. }
         ));
 
+        let files = flatten_list_stream(storage, None).await.unwrap();
+        assert_eq!(&files, &[]);
+
+        let result = storage.list_with_delimiter(None).await.unwrap();
+        assert_eq!(&result.objects, &[]);
+
         writer.shutdown().await.unwrap();
         let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
         assert_eq!(bytes_expected, bytes_written);
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 9a518ba47..f1733f54b 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -27,8 +27,8 @@ use futures::future::BoxFuture;
 use futures::FutureExt;
 use futures::{stream::BoxStream, StreamExt};
 use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::fs::{metadata, symlink_metadata, File};
-use std::io::{Read, Seek, SeekFrom, Write};
+use std::fs::{metadata, symlink_metadata, File, OpenOptions};
+use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -65,6 +65,11 @@ pub(crate) enum Error {
         source: io::Error,
     },
 
+    #[snafu(display("Unable to rename file: {}", source))]
+    UnableToRenameFile {
+        source: io::Error,
+    },
+
     #[snafu(display("Unable to create dir {}: {}", path.display(), source))]
     UnableToCreateDir {
         source: io::Error,
@@ -266,11 +271,14 @@ impl ObjectStore for LocalFileSystem {
         let path = self.config.path_to_filesystem(location)?;
 
         maybe_spawn_blocking(move || {
-            let mut file = open_writable_file(&path)?;
+            let (mut file, suffix) = new_staged_upload(&path)?;
+            let staging_path = staged_upload_path(&path, &suffix);
 
             file.write_all(&bytes)
                 .context(UnableToCopyDataToFileSnafu)?;
 
+            std::fs::rename(staging_path, 
path).context(UnableToRenameFileSnafu)?;
+
             Ok(())
         })
         .await
@@ -282,28 +290,10 @@ impl ObjectStore for LocalFileSystem {
     ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
         let dest = self.config.path_to_filesystem(location)?;
 
-        // Generate an id in case of concurrent writes
-        let mut multipart_id = 1;
-
-        // Will write to a temporary path
-        let staging_path = loop {
-            let staging_path = get_upload_stage_path(&dest, 
&multipart_id.to_string());
-
-            match std::fs::metadata(&staging_path) {
-                Err(err) if err.kind() == io::ErrorKind::NotFound => break 
staging_path,
-                Err(err) => {
-                    return Err(Error::UnableToCopyDataToFile { source: err 
}.into())
-                }
-                Ok(_) => multipart_id += 1,
-            }
-        };
-        let multipart_id = multipart_id.to_string();
-
-        let file = open_writable_file(&staging_path)?;
-
+        let (file, suffix) = new_staged_upload(&dest)?;
         Ok((
-            multipart_id.clone(),
-            Box::new(LocalUpload::new(dest, multipart_id, Arc::new(file))),
+            suffix.clone(),
+            Box::new(LocalUpload::new(dest, suffix, Arc::new(file))),
         ))
     }
 
@@ -313,7 +303,7 @@ impl ObjectStore for LocalFileSystem {
         multipart_id: &MultipartId,
     ) -> Result<()> {
         let dest = self.config.path_to_filesystem(location)?;
-        let staging_path: PathBuf = get_upload_stage_path(&dest, multipart_id);
+        let staging_path: PathBuf = staged_upload_path(&dest, multipart_id);
 
         maybe_spawn_blocking(move || {
             std::fs::remove_file(&staging_path)
@@ -553,9 +543,40 @@ impl ObjectStore for LocalFileSystem {
     }
 }
 
-fn get_upload_stage_path(dest: &std::path::Path, multipart_id: &MultipartId) 
-> PathBuf {
+/// Generates a unique file path `{base}#{suffix}`, returning the opened 
`File` and `suffix`
+///
+/// Creates any directories if necessary
+fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
+    let mut multipart_id = 1;
+    loop {
+        let suffix = multipart_id.to_string();
+        let path = staged_upload_path(base, &suffix);
+        let mut options = OpenOptions::new();
+        match options.read(true).write(true).create_new(true).open(&path) {
+            Ok(f) => return Ok((f, suffix)),
+            Err(e) if e.kind() == ErrorKind::AlreadyExists => {
+                multipart_id += 1;
+            }
+            Err(err) if err.kind() == ErrorKind::NotFound => {
+                let parent = path
+                    .parent()
+                    .context(UnableToCreateFileSnafu { path: &path, err })?;
+
+                std::fs::create_dir_all(parent)
+                    .context(UnableToCreateDirSnafu { path: parent })?;
+
+                continue;
+            }
+            Err(source) => return Err(Error::UnableToOpenFile { source, path 
}.into()),
+        }
+    }
+}
+
+/// Returns the unique upload for the given path and suffix
+fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
     let mut staging_path = dest.as_os_str().to_owned();
-    staging_path.push(format!("#{multipart_id}"));
+    staging_path.push("#");
+    staging_path.push(suffix);
     staging_path.into()
 }
 
@@ -700,7 +721,7 @@ impl AsyncWrite for LocalUpload {
                         Poll::Ready(res) => {
                             res?;
                             let staging_path =
-                                get_upload_stage_path(&self.dest, 
&self.multipart_id);
+                                staged_upload_path(&self.dest, 
&self.multipart_id);
                             let dest = self.dest.clone();
                             self.inner_state = 
LocalUploadState::Committing(Box::pin(
                                 runtime
@@ -741,7 +762,7 @@ impl AsyncWrite for LocalUpload {
                 }
             }
         } else {
-            let staging_path = get_upload_stage_path(&self.dest, 
&self.multipart_id);
+            let staging_path = staged_upload_path(&self.dest, 
&self.multipart_id);
             match &mut self.inner_state {
                 LocalUploadState::Idle(file) => {
                     let file = Arc::clone(file);
@@ -802,33 +823,6 @@ fn open_file(path: &PathBuf) -> Result<File> {
     Ok(file)
 }
 
-fn open_writable_file(path: &PathBuf) -> Result<File> {
-    match File::create(path) {
-        Ok(f) => Ok(f),
-        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
-            let parent = path
-                .parent()
-                .context(UnableToCreateFileSnafu { path: &path, err })?;
-            std::fs::create_dir_all(parent)
-                .context(UnableToCreateDirSnafu { path: parent })?;
-
-            match File::create(path) {
-                Ok(f) => Ok(f),
-                Err(err) => Err(Error::UnableToCreateFile {
-                    path: path.to_path_buf(),
-                    err,
-                }
-                .into()),
-            }
-        }
-        Err(err) => Err(Error::UnableToCreateFile {
-            path: path.to_path_buf(),
-            err,
-        }
-        .into()),
-    }
-}
-
 fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
     let metadata = entry
         .metadata()

Reply via email to