This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ab87abdd69 Generate `ETag`s for `InMemory` and `LocalFileSystem`
(#4879) (#4922)
ab87abdd69 is described below
commit ab87abdd69ab787fdf247cf36f04abc1fbfa6266
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Oct 17 12:39:34 2023 +0100
Generate `ETag`s for `InMemory` and `LocalFileSystem` (#4879) (#4922)
* Support ETag in InMemory (#4879)
* Add LocalFileSystem Etag
* Review feedback
* Review feedback
---
object_store/src/lib.rs | 206 ++++++++++++++++++++++++++++++++++-----------
object_store/src/local.rs | 37 +++++---
object_store/src/memory.rs | 149 ++++++++++++++++++--------------
3 files changed, 268 insertions(+), 124 deletions(-)
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index ff0a46533d..b79042e3cd 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -698,12 +698,28 @@ pub struct GetOptions {
/// Request will succeed if the `ObjectMeta::e_tag` matches
/// otherwise returning [`Error::Precondition`]
///
- /// <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
+ /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
+ ///
+ /// Examples:
+ ///
+ /// ```text
+ /// If-Match: "xyzzy"
+ /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
+ /// If-Match: *
+ /// ```
pub if_match: Option<String>,
/// Request will succeed if the `ObjectMeta::e_tag` does not match
/// otherwise returning [`Error::NotModified`]
///
- /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
+ /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
+ ///
+ /// Examples:
+ ///
+ /// ```text
+ /// If-None-Match: "xyzzy"
+ /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
+ /// If-None-Match: *
+ /// ```
pub if_none_match: Option<String>,
/// Request will succeed if the object has been modified since
///
@@ -730,25 +746,41 @@ pub struct GetOptions {
impl GetOptions {
/// Returns an error if the modification conditions on this request are
not satisfied
- fn check_modified(
- &self,
- location: &Path,
- last_modified: DateTime<Utc>,
- ) -> Result<()> {
- if let Some(date) = self.if_modified_since {
- if last_modified <= date {
- return Err(Error::NotModified {
- path: location.to_string(),
- source: format!("{} >= {}", date, last_modified).into(),
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
+ fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
+ // The use of the invalid etag "*" means no ETag is equivalent to
never matching
+ let etag = meta.e_tag.as_deref().unwrap_or("*");
+ let last_modified = meta.last_modified;
+
+ if let Some(m) = &self.if_match {
+ if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
+ return Err(Error::Precondition {
+ path: meta.location.to_string(),
+ source: format!("{etag} does not match {m}").into(),
});
}
- }
-
- if let Some(date) = self.if_unmodified_since {
+ } else if let Some(date) = self.if_unmodified_since {
if last_modified > date {
return Err(Error::Precondition {
- path: location.to_string(),
- source: format!("{} < {}", date, last_modified).into(),
+ path: meta.location.to_string(),
+ source: format!("{date} < {last_modified}").into(),
+ });
+ }
+ }
+
+ if let Some(m) = &self.if_none_match {
+ if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
+ return Err(Error::NotModified {
+ path: meta.location.to_string(),
+ source: format!("{etag} matches {m}").into(),
+ });
+ }
+ } else if let Some(date) = self.if_modified_since {
+ if last_modified <= date {
+ return Err(Error::NotModified {
+ path: meta.location.to_string(),
+ source: format!("{date} >= {last_modified}").into(),
});
}
}
@@ -952,6 +984,7 @@ mod test_util {
mod tests {
use super::*;
use crate::test_util::flatten_list_stream;
+ use chrono::TimeZone;
use rand::{thread_rng, Rng};
use tokio::io::AsyncWriteExt;
@@ -1359,33 +1392,32 @@ mod tests {
Err(e) => panic!("{e}"),
}
- if let Some(tag) = meta.e_tag {
- let options = GetOptions {
- if_match: Some(tag.clone()),
- ..GetOptions::default()
- };
- storage.get_opts(&path, options).await.unwrap();
-
- let options = GetOptions {
- if_match: Some("invalid".to_string()),
- ..GetOptions::default()
- };
- let err = storage.get_opts(&path, options).await.unwrap_err();
- assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
- let options = GetOptions {
- if_none_match: Some(tag.clone()),
- ..GetOptions::default()
- };
- let err = storage.get_opts(&path, options).await.unwrap_err();
- assert!(matches!(err, Error::NotModified { .. }), "{err}");
-
- let options = GetOptions {
- if_none_match: Some("invalid".to_string()),
- ..GetOptions::default()
- };
- storage.get_opts(&path, options).await.unwrap();
- }
+ let tag = meta.e_tag.unwrap();
+ let options = GetOptions {
+ if_match: Some(tag.clone()),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+
+ let options = GetOptions {
+ if_match: Some("invalid".to_string()),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+ let options = GetOptions {
+ if_none_match: Some(tag.clone()),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::NotModified { .. }), "{err}");
+
+ let options = GetOptions {
+ if_none_match: Some("invalid".to_string()),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
}
/// Returns a chunk of length `chunk_length`
@@ -1697,8 +1729,86 @@ mod tests {
assert!(stream.next().await.is_none());
}
- // Tests TODO:
- // GET nonexisting location (in_memory/file)
- // DELETE nonexisting location
- // PUT overwriting
+ #[test]
+ fn test_preconditions() {
+ let mut meta = ObjectMeta {
+ location: Path::from("test"),
+ last_modified: Utc.timestamp_nanos(100),
+ size: 100,
+ e_tag: Some("123".to_string()),
+ };
+
+ let mut options = GetOptions::default();
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_modified_since = Some(Utc.timestamp_nanos(50));
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_modified_since = Some(Utc.timestamp_nanos(100));
+ options.check_preconditions(&meta).unwrap_err();
+
+ options.if_modified_since = Some(Utc.timestamp_nanos(101));
+ options.check_preconditions(&meta).unwrap_err();
+
+ options = GetOptions::default();
+
+ options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
+ options.check_preconditions(&meta).unwrap_err();
+
+ options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
+ options.check_preconditions(&meta).unwrap();
+
+ options = GetOptions::default();
+
+ options.if_match = Some("123".to_string());
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_match = Some("123,354".to_string());
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_match = Some("354, 123,".to_string());
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_match = Some("354".to_string());
+ options.check_preconditions(&meta).unwrap_err();
+
+ options.if_match = Some("*".to_string());
+ options.check_preconditions(&meta).unwrap();
+
+ // If-Match takes precedence
+ options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
+ options.check_preconditions(&meta).unwrap();
+
+ options = GetOptions::default();
+
+ options.if_none_match = Some("123".to_string());
+ options.check_preconditions(&meta).unwrap_err();
+
+ options.if_none_match = Some("*".to_string());
+ options.check_preconditions(&meta).unwrap_err();
+
+ options.if_none_match = Some("1232".to_string());
+ options.check_preconditions(&meta).unwrap();
+
+ options.if_none_match = Some("23, 123".to_string());
+ options.check_preconditions(&meta).unwrap_err();
+
+ // If-None-Match takes precedence
+ options.if_modified_since = Some(Utc.timestamp_nanos(10));
+ options.check_preconditions(&meta).unwrap_err();
+
+ // Check missing ETag
+ meta.e_tag = None;
+ options = GetOptions::default();
+
+ options.if_none_match = Some("*".to_string()); // Fails if any file
exists
+ options.check_preconditions(&meta).unwrap_err();
+
+ options = GetOptions::default();
+ options.if_match = Some("*".to_string()); // Passes if file exists
+ options.check_preconditions(&meta).unwrap();
+ }
}
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 3ed63a4108..3d4a02a1e9 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -365,23 +365,12 @@ impl ObjectStore for LocalFileSystem {
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- if options.if_match.is_some() || options.if_none_match.is_some() {
- return Err(super::Error::NotSupported {
- source: "ETags not supported by
LocalFileSystem".to_string().into(),
- });
- }
-
let location = location.clone();
let path = self.config.path_to_filesystem(&location)?;
maybe_spawn_blocking(move || {
let (file, metadata) = open_file(&path)?;
- if options.if_unmodified_since.is_some()
- || options.if_modified_since.is_some()
- {
- options.check_modified(&location, last_modified(&metadata))?;
- }
-
let meta = convert_metadata(metadata, location)?;
+ options.check_preconditions(&meta)?;
Ok(GetResult {
payload: GetResultPayload::File(file, path),
@@ -965,7 +954,7 @@ fn convert_entry(entry: DirEntry, location: Path) ->
Result<ObjectMeta> {
convert_metadata(metadata, location)
}
-fn last_modified(metadata: &std::fs::Metadata) -> DateTime<Utc> {
+fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
metadata
.modified()
.expect("Modified file time should be supported on this platform")
@@ -977,15 +966,35 @@ fn convert_metadata(metadata: Metadata, location: Path)
-> Result<ObjectMeta> {
let size =
usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
path: location.as_ref(),
})?;
+ let inode = get_inode(&metadata);
+ let mtime = last_modified.timestamp_micros();
+
+ // Use an ETag scheme based on that used by many popular HTTP servers
+ // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
+ //
<https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
+ let etag = format!("{inode:x}-{mtime:x}-{size:x}");
Ok(ObjectMeta {
location,
last_modified,
size,
- e_tag: None,
+ e_tag: Some(etag),
})
}
+#[cfg(unix)]
+/// We include the inode when available to yield an ETag more resistant to
collisions
+/// and as used by popular web servers such as
[Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
+fn get_inode(metadata: &Metadata) -> u64 {
+ std::os::unix::fs::MetadataExt::ino(metadata)
+}
+
+#[cfg(not(unix))]
+/// On platforms where an inode isn't available, fallback to just relying on
size and mtime
+fn get_inode(metadata: &Metadata) -> u64 {
+ 0
+}
+
/// Convert walkdir results and converts not-found errors into `None`.
/// Convert broken symlinks to `None`.
fn convert_walkdir_result(
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 0e229885b0..f638ed6d7a 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -35,9 +35,6 @@ use std::sync::Arc;
use std::task::Poll;
use tokio::io::AsyncWrite;
-type Entry = (Bytes, DateTime<Utc>);
-type StorageType = Arc<RwLock<BTreeMap<Path, Entry>>>;
-
/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
@@ -80,7 +77,41 @@ impl From<Error> for super::Error {
/// storage provider.
#[derive(Debug, Default)]
pub struct InMemory {
- storage: StorageType,
+ storage: SharedStorage,
+}
+
+#[derive(Debug, Clone)]
+struct Entry {
+ data: Bytes,
+ last_modified: DateTime<Utc>,
+ e_tag: usize,
+}
+
+impl Entry {
+ fn new(data: Bytes, last_modified: DateTime<Utc>, e_tag: usize) -> Self {
+ Self {
+ data,
+ last_modified,
+ e_tag,
+ }
+ }
+}
+
+#[derive(Debug, Default, Clone)]
+struct Storage {
+ next_etag: usize,
+ map: BTreeMap<Path, Entry>,
+}
+
+type SharedStorage = Arc<RwLock<Storage>>;
+
+impl Storage {
+ fn insert(&mut self, location: &Path, bytes: Bytes) {
+ let etag = self.next_etag;
+ self.next_etag += 1;
+ let entry = Entry::new(bytes, Utc::now(), etag);
+ self.map.insert(location.clone(), entry);
+ }
}
impl std::fmt::Display for InMemory {
@@ -92,9 +123,7 @@ impl std::fmt::Display for InMemory {
#[async_trait]
impl ObjectStore for InMemory {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.storage
- .write()
- .insert(location.clone(), (bytes, Utc::now()));
+ self.storage.write().insert(location, bytes);
Ok(())
}
@@ -128,33 +157,30 @@ impl ObjectStore for InMemory {
Ok(Box::new(InMemoryAppend {
location: location.clone(),
data: Vec::<u8>::new(),
- storage: StorageType::clone(&self.storage),
+ storage: SharedStorage::clone(&self.storage),
}))
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- if options.if_match.is_some() || options.if_none_match.is_some() {
- return Err(super::Error::NotSupported {
- source: "ETags not supported by InMemory".to_string().into(),
- });
- }
- let (data, last_modified) = self.entry(location).await?;
- options.check_modified(location, last_modified)?;
+ let entry = self.entry(location).await?;
+ let e_tag = entry.e_tag.to_string();
+
let meta = ObjectMeta {
location: location.clone(),
- last_modified,
- size: data.len(),
- e_tag: None,
+ last_modified: entry.last_modified,
+ size: entry.data.len(),
+ e_tag: Some(e_tag),
};
+ options.check_preconditions(&meta)?;
let (range, data) = match options.range {
Some(range) => {
- let len = data.len();
+ let len = entry.data.len();
ensure!(range.end <= len, OutOfRangeSnafu { range, len });
ensure!(range.start <= range.end, BadRangeSnafu { range });
- (range.clone(), data.slice(range))
+ (range.clone(), entry.data.slice(range))
}
- None => (0..data.len(), data),
+ None => (0..entry.data.len(), entry.data),
};
let stream = futures::stream::once(futures::future::ready(Ok(data)));
@@ -170,15 +196,18 @@ impl ObjectStore for InMemory {
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
- let data = self.entry(location).await?;
+ let entry = self.entry(location).await?;
ranges
.iter()
.map(|range| {
let range = range.clone();
- let len = data.0.len();
- ensure!(range.end <= data.0.len(), OutOfRangeSnafu { range,
len });
+ let len = entry.data.len();
+ ensure!(
+ range.end <= entry.data.len(),
+ OutOfRangeSnafu { range, len }
+ );
ensure!(range.start <= range.end, BadRangeSnafu { range });
- Ok(data.0.slice(range))
+ Ok(entry.data.slice(range))
})
.collect()
}
@@ -188,14 +217,14 @@ impl ObjectStore for InMemory {
Ok(ObjectMeta {
location: location.clone(),
- last_modified: entry.1,
- size: entry.0.len(),
- e_tag: None,
+ last_modified: entry.last_modified,
+ size: entry.data.len(),
+ e_tag: Some(entry.e_tag.to_string()),
})
}
async fn delete(&self, location: &Path) -> Result<()> {
- self.storage.write().remove(location);
+ self.storage.write().map.remove(location);
Ok(())
}
@@ -208,6 +237,7 @@ impl ObjectStore for InMemory {
let storage = self.storage.read();
let values: Vec<_> = storage
+ .map
.range((prefix)..)
.take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
.filter(|(key, _)| {
@@ -219,9 +249,9 @@ impl ObjectStore for InMemory {
.map(|(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
- last_modified: value.1,
- size: value.0.len(),
- e_tag: None,
+ last_modified: value.last_modified,
+ size: value.data.len(),
+ e_tag: Some(value.e_tag.to_string()),
})
})
.collect();
@@ -241,7 +271,7 @@ impl ObjectStore for InMemory {
// Only objects in this base level should be returned in the
// response. Otherwise, we just collect the common prefixes.
let mut objects = vec![];
- for (k, v) in self.storage.read().range((prefix)..) {
+ for (k, v) in self.storage.read().map.range((prefix)..) {
if !k.as_ref().starts_with(prefix.as_ref()) {
break;
}
@@ -263,9 +293,9 @@ impl ObjectStore for InMemory {
} else {
let object = ObjectMeta {
location: k.clone(),
- last_modified: v.1,
- size: v.0.len(),
- e_tag: None,
+ last_modified: v.last_modified,
+ size: v.data.len(),
+ e_tag: Some(v.e_tag.to_string()),
};
objects.push(object);
}
@@ -278,23 +308,21 @@ impl ObjectStore for InMemory {
}
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- let data = self.entry(from).await?;
- self.storage
- .write()
- .insert(to.clone(), (data.0, Utc::now()));
+ let entry = self.entry(from).await?;
+ self.storage.write().insert(to, entry.data);
Ok(())
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- let data = self.entry(from).await?;
+ let entry = self.entry(from).await?;
let mut storage = self.storage.write();
- if storage.contains_key(to) {
+ if storage.map.contains_key(to) {
return Err(Error::AlreadyExists {
path: to.to_string(),
}
.into());
}
- storage.insert(to.clone(), (data.0, Utc::now()));
+ storage.insert(to, entry.data);
Ok(())
}
}
@@ -319,9 +347,10 @@ impl InMemory {
self.fork()
}
- async fn entry(&self, location: &Path) -> Result<(Bytes, DateTime<Utc>)> {
+ async fn entry(&self, location: &Path) -> Result<Entry> {
let storage = self.storage.read();
let value = storage
+ .map
.get(location)
.cloned()
.context(NoDataInMemorySnafu {
@@ -335,7 +364,7 @@ impl InMemory {
struct InMemoryUpload {
location: Path,
data: Vec<u8>,
- storage: StorageType,
+ storage: Arc<RwLock<Storage>>,
}
impl AsyncWrite for InMemoryUpload {
@@ -343,7 +372,7 @@ impl AsyncWrite for InMemoryUpload {
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
- ) -> std::task::Poll<Result<usize, io::Error>> {
+ ) -> Poll<Result<usize, io::Error>> {
self.data.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
@@ -351,18 +380,16 @@ impl AsyncWrite for InMemoryUpload {
fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
+ ) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
+ ) -> Poll<Result<(), io::Error>> {
let data = Bytes::from(std::mem::take(&mut self.data));
- self.storage
- .write()
- .insert(self.location.clone(), (data, Utc::now()));
+ self.storage.write().insert(&self.location, data);
Poll::Ready(Ok(()))
}
}
@@ -370,7 +397,7 @@ impl AsyncWrite for InMemoryUpload {
struct InMemoryAppend {
location: Path,
data: Vec<u8>,
- storage: StorageType,
+ storage: Arc<RwLock<Storage>>,
}
impl AsyncWrite for InMemoryAppend {
@@ -378,7 +405,7 @@ impl AsyncWrite for InMemoryAppend {
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &[u8],
- ) -> std::task::Poll<Result<usize, io::Error>> {
+ ) -> Poll<Result<usize, io::Error>> {
self.data.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
@@ -386,20 +413,18 @@ impl AsyncWrite for InMemoryAppend {
fn poll_flush(
mut self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Result<(), io::Error>> {
- let storage = StorageType::clone(&self.storage);
+ ) -> Poll<Result<(), io::Error>> {
+ let storage = Arc::clone(&self.storage);
let mut writer = storage.write();
- if let Some((bytes, _)) = writer.remove(&self.location) {
+ if let Some(entry) = writer.map.remove(&self.location) {
let buf = std::mem::take(&mut self.data);
- let concat = Bytes::from_iter(bytes.into_iter().chain(buf));
- writer.insert(self.location.clone(), (concat, Utc::now()));
+ let concat = Bytes::from_iter(entry.data.into_iter().chain(buf));
+ writer.insert(&self.location, concat);
} else {
- writer.insert(
- self.location.clone(),
- (Bytes::from(std::mem::take(&mut self.data)), Utc::now()),
- );
+ let data = Bytes::from(std::mem::take(&mut self.data));
+ writer.insert(&self.location, data);
};
Poll::Ready(Ok(()))
}