This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4f6729673 Make LocalFileSystem::put atomic (#3780) (#3781)
4f6729673 is described below
commit 4f6729673b9c97e3daa8983ef52c51033b72a741
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 2 16:57:52 2023 +0000
Make LocalFileSystem::put atomic (#3780) (#3781)
* Make LocalFileSystem::put atomic (#3780)
* Clippy
* Add list test
---
object_store/src/lib.rs | 6 +++
object_store/src/local.rs | 106 ++++++++++++++++++++++------------------------
2 files changed, 56 insertions(+), 56 deletions(-)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 6a3275bb0..671b22d0f 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -840,6 +840,12 @@ mod tests {
crate::Error::NotFound { .. }
));
+ let files = flatten_list_stream(storage, None).await.unwrap();
+ assert_eq!(&files, &[]);
+
+ let result = storage.list_with_delimiter(None).await.unwrap();
+ assert_eq!(&result.objects, &[]);
+
writer.shutdown().await.unwrap();
let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
assert_eq!(bytes_expected, bytes_written);
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 9a518ba47..f1733f54b 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -27,8 +27,8 @@ use futures::future::BoxFuture;
use futures::FutureExt;
use futures::{stream::BoxStream, StreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::fs::{metadata, symlink_metadata, File};
-use std::io::{Read, Seek, SeekFrom, Write};
+use std::fs::{metadata, symlink_metadata, File, OpenOptions};
+use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
@@ -65,6 +65,11 @@ pub(crate) enum Error {
source: io::Error,
},
+ #[snafu(display("Unable to rename file: {}", source))]
+ UnableToRenameFile {
+ source: io::Error,
+ },
+
#[snafu(display("Unable to create dir {}: {}", path.display(), source))]
UnableToCreateDir {
source: io::Error,
@@ -266,11 +271,14 @@ impl ObjectStore for LocalFileSystem {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
- let mut file = open_writable_file(&path)?;
+ let (mut file, suffix) = new_staged_upload(&path)?;
+ let staging_path = staged_upload_path(&path, &suffix);
file.write_all(&bytes)
.context(UnableToCopyDataToFileSnafu)?;
+ std::fs::rename(staging_path,
path).context(UnableToRenameFileSnafu)?;
+
Ok(())
})
.await
@@ -282,28 +290,10 @@ impl ObjectStore for LocalFileSystem {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let dest = self.config.path_to_filesystem(location)?;
- // Generate an id in case of concurrent writes
- let mut multipart_id = 1;
-
- // Will write to a temporary path
- let staging_path = loop {
- let staging_path = get_upload_stage_path(&dest,
&multipart_id.to_string());
-
- match std::fs::metadata(&staging_path) {
- Err(err) if err.kind() == io::ErrorKind::NotFound => break
staging_path,
- Err(err) => {
- return Err(Error::UnableToCopyDataToFile { source: err
}.into())
- }
- Ok(_) => multipart_id += 1,
- }
- };
- let multipart_id = multipart_id.to_string();
-
- let file = open_writable_file(&staging_path)?;
-
+ let (file, suffix) = new_staged_upload(&dest)?;
Ok((
- multipart_id.clone(),
- Box::new(LocalUpload::new(dest, multipart_id, Arc::new(file))),
+ suffix.clone(),
+ Box::new(LocalUpload::new(dest, suffix, Arc::new(file))),
))
}
@@ -313,7 +303,7 @@ impl ObjectStore for LocalFileSystem {
multipart_id: &MultipartId,
) -> Result<()> {
let dest = self.config.path_to_filesystem(location)?;
- let staging_path: PathBuf = get_upload_stage_path(&dest, multipart_id);
+ let staging_path: PathBuf = staged_upload_path(&dest, multipart_id);
maybe_spawn_blocking(move || {
std::fs::remove_file(&staging_path)
@@ -553,9 +543,40 @@ impl ObjectStore for LocalFileSystem {
}
}
-fn get_upload_stage_path(dest: &std::path::Path, multipart_id: &MultipartId)
-> PathBuf {
+/// Generates a unique file path `{base}#{suffix}`, returning the opened
`File` and `suffix`
+///
+/// Creates any directories if necessary
+fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
+ let mut multipart_id = 1;
+ loop {
+ let suffix = multipart_id.to_string();
+ let path = staged_upload_path(base, &suffix);
+ let mut options = OpenOptions::new();
+ match options.read(true).write(true).create_new(true).open(&path) {
+ Ok(f) => return Ok((f, suffix)),
+ Err(e) if e.kind() == ErrorKind::AlreadyExists => {
+ multipart_id += 1;
+ }
+ Err(err) if err.kind() == ErrorKind::NotFound => {
+ let parent = path
+ .parent()
+ .context(UnableToCreateFileSnafu { path: &path, err })?;
+
+ std::fs::create_dir_all(parent)
+ .context(UnableToCreateDirSnafu { path: parent })?;
+
+ continue;
+ }
+ Err(source) => return Err(Error::UnableToOpenFile { source, path
}.into()),
+ }
+ }
+}
+
+/// Returns the unique upload for the given path and suffix
+fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
let mut staging_path = dest.as_os_str().to_owned();
- staging_path.push(format!("#{multipart_id}"));
+ staging_path.push("#");
+ staging_path.push(suffix);
staging_path.into()
}
@@ -700,7 +721,7 @@ impl AsyncWrite for LocalUpload {
Poll::Ready(res) => {
res?;
let staging_path =
- get_upload_stage_path(&self.dest,
&self.multipart_id);
+ staged_upload_path(&self.dest,
&self.multipart_id);
let dest = self.dest.clone();
self.inner_state =
LocalUploadState::Committing(Box::pin(
runtime
@@ -741,7 +762,7 @@ impl AsyncWrite for LocalUpload {
}
}
} else {
- let staging_path = get_upload_stage_path(&self.dest,
&self.multipart_id);
+ let staging_path = staged_upload_path(&self.dest,
&self.multipart_id);
match &mut self.inner_state {
LocalUploadState::Idle(file) => {
let file = Arc::clone(file);
@@ -802,33 +823,6 @@ fn open_file(path: &PathBuf) -> Result<File> {
Ok(file)
}
-fn open_writable_file(path: &PathBuf) -> Result<File> {
- match File::create(path) {
- Ok(f) => Ok(f),
- Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
- let parent = path
- .parent()
- .context(UnableToCreateFileSnafu { path: &path, err })?;
- std::fs::create_dir_all(parent)
- .context(UnableToCreateDirSnafu { path: parent })?;
-
- match File::create(path) {
- Ok(f) => Ok(f),
- Err(err) => Err(Error::UnableToCreateFile {
- path: path.to_path_buf(),
- err,
- }
- .into()),
- }
- }
- Err(err) => Err(Error::UnableToCreateFile {
- path: path.to_path_buf(),
- err,
- }
- .into()),
- }
-}
-
fn convert_entry(entry: DirEntry, location: Path) -> Result<ObjectMeta> {
let metadata = entry
.metadata()