alamb commented on code in PR #4978:
URL: https://github.com/apache/arrow-rs/pull/4978#discussion_r1374994374
##########
object_store/src/lib.rs:
##########
@@ -38,20 +38,27 @@
//!
//! # Highlights
//!
-//! 1. A focused, easy to use, idiomatic, well documented, high
-//! performance, `async` API.
+//! 1. A high-performance async API focused on providing a consistent interface
+//! mirroring that of object stores such as [S3]
//!
//! 2. Production quality, leading this crate to be used in large
-//! scale production systems, such as [crates.io] and [InfluxDB IOx].
+//! scale production systems, such as [crates.io] and [InfluxDB IOx]
//!
-//! 3. Stable and predictable governance via the [Apache Arrow] project.
+//! 3. Support for advanced functionality, including [ACID] reads
Review Comment:
I suggest you don't use ACID here as it is a database term and not as
commonly applied to object stores in my experience.
For example, I think the claim that the D in ACID for **D**urable counts as
"advanced" functionality for object stores / their APIs is confusing. It is
also unclear to me what this crates adds for `Isolation` in the classic
database sense of the word.
Perhaps a better term would be "atomic reads/writes" which also highlights
the feature I think you are hinting at
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//! entries: HashMap<Path, CacheEntry>,
+//! store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//! Ok(match self.entries.get_mut(path) {
+//! Some(e) => match e.refreshed_at.elapsed() <
Duration::from_secs(10) {
+//! true => e.data.clone(), // Return cached data
+//! false => {
+//! let opts = GetOptions {
+//! if_none_match: Some(e.e_tag.clone()),
+//! ..GetOptions::default()
+//! };
+//! match self.store.get_opts(&path, opts).await {
+//! Ok(d) => e.data = d.bytes().await?,
+//! Err(Error::NotModified { .. }) => {} // Data has
not changed
+//! Err(e) => return Err(e),
+//! };
+//! e.refreshed_at = Instant::now();
+//! e.data.clone()
+//! }
+//! },
+//! None => {
+//! let get = self.store.get(&path).await?;
+//! let e_tag = get.meta.e_tag.clone();
+//! let data = get.bytes().await?;
+//! if let Some(e_tag) = e_tag {
+//! let entry = CacheEntry {
+//! e_tag,
+//! data: data.clone(),
+//! refreshed_at: Instant::now(),
+//! };
+//! self.entries.insert(path.clone(), entry);
+//! }
+//! data
+//! }
+//! })
+//! }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object
at the given path.
+//! More complex behaviours can be achieved using [`PutMode`], and can be used
to build
+//! [Optimistic Concurrency Control] based transactions. This facilitates
building metadata catalogs,
+//! such as [Apache Iceberg] or [Delta Lake], directly on top of object
storage, without relying on
+//! a separate DBMS.
+//!
+//! ```
+//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::memory::InMemory;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(InMemory::new())
+//! # }
+//! # fn do_update(b: Bytes) -> Bytes {b}
+//! # async fn conditional_put() {
Review Comment:
It might be helpful to have this name exposed
```suggestion
//! async fn conditional_put() {
```
##########
object_store/src/lib.rs:
##########
@@ -79,6 +86,22 @@
doc = "* [`http`]: [HTTP/WebDAV
Storage](https://datatracker.ietf.org/doc/html/rfc2518). See
[`HttpBuilder`](http::HttpBuilder)"
)]
//!
+//! # Why not a Filesystem Interface?
+//!
+//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`]
interface mirrors the APIs
+//! of object stores and not filesystems.
Review Comment:
I think it might help to give an example of what a filesystem API means.
Perhaps something like
```suggestion
//! of object stores and not filesystems. For example, it purposely does not
offer a `Seek` like API.
```
##########
object_store/src/lib.rs:
##########
@@ -38,20 +38,27 @@
//!
//! # Highlights
//!
-//! 1. A focused, easy to use, idiomatic, well documented, high
-//! performance, `async` API.
+//! 1. A high-performance async API focused on providing a consistent interface
+//! mirroring that of object stores such as [S3]
//!
//! 2. Production quality, leading this crate to be used in large
-//! scale production systems, such as [crates.io] and [InfluxDB IOx].
+//! scale production systems, such as [crates.io] and [InfluxDB IOx]
//!
-//! 3. Stable and predictable governance via the [Apache Arrow] project.
+//! 3. Support for advanced functionality, including [ACID] reads
+//! and writes, vectored IO, bulk deletion, and more...
+//!
+//! 4. Stable and predictable governance via the [Apache Arrow] project
+//!
+//! 5. Very low dependency footprint
Review Comment:
```suggestion
//! 5. Small dependency footprint (depends on only a small number of common
crates)
```
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//! entries: HashMap<Path, CacheEntry>,
+//! store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//! Ok(match self.entries.get_mut(path) {
+//! Some(e) => match e.refreshed_at.elapsed() <
Duration::from_secs(10) {
+//! true => e.data.clone(), // Return cached data
+//! false => {
+//! let opts = GetOptions {
+//! if_none_match: Some(e.e_tag.clone()),
+//! ..GetOptions::default()
+//! };
+//! match self.store.get_opts(&path, opts).await {
+//! Ok(d) => e.data = d.bytes().await?,
+//! Err(Error::NotModified { .. }) => {} // Data has
not changed
+//! Err(e) => return Err(e),
+//! };
+//! e.refreshed_at = Instant::now();
+//! e.data.clone()
+//! }
+//! },
+//! None => {
+//! let get = self.store.get(&path).await?;
+//! let e_tag = get.meta.e_tag.clone();
+//! let data = get.bytes().await?;
+//! if let Some(e_tag) = e_tag {
+//! let entry = CacheEntry {
+//! e_tag,
+//! data: data.clone(),
+//! refreshed_at: Instant::now(),
+//! };
+//! self.entries.insert(path.clone(), entry);
+//! }
+//! data
+//! }
+//! })
+//! }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object
at the given path.
Review Comment:
```suggestion
//! The default behaviour when writing data is to upsert any existing object
at the given path, overwriting any previous values.
```
##########
object_store/src/lib.rs:
##########
@@ -141,25 +199,34 @@
//! # use futures::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
-//! # use object_store::{path::Path, ObjectStore};
+//! # use bytes::Bytes;
+//! # use object_store::{path::Path, ObjectStore, GetResult};
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
//! # }
//! #
//! # async fn example() {
//! #
-//! // create an ObjectStore
+//! // Create an ObjectStore
//! let object_store: Arc<dyn ObjectStore> = get_object_store();
//!
//! // Retrieve a specific file
-//! let path: Path = "data/file01.parquet".try_into().unwrap();
+//! let path = Path::from("data/file01.parquet");
Review Comment:
I find the shadowing of `Path` with `std::fs::path::Path` always a bit
confusing. Maybe we could make the `use object_store::Path` unhidden to make
this clearer
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//! entries: HashMap<Path, CacheEntry>,
+//! store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//! Ok(match self.entries.get_mut(path) {
+//! Some(e) => match e.refreshed_at.elapsed() <
Duration::from_secs(10) {
+//! true => e.data.clone(), // Return cached data
+//! false => {
Review Comment:
```suggestion
//! false => { // check if remote version has changed
```
##########
object_store/src/lib.rs:
##########
@@ -190,15 +253,48 @@
//! # }
//! # async fn put() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/file1".try_into().unwrap();
-//! let bytes = Bytes::from_static(b"hello");
-//! object_store.put(&path, bytes).await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/file1");
+//! let bytes = Bytes::from_static(b"hello");
+//! object_store.put(&path, bytes).await.unwrap();
+//! # }
+//! ```
+//!
+//! # Multipart Upload
+//!
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large
amount of data,
+//! with implementations automatically handling parallel, chunked upload where
appropriate.
+//!
+//! ```
+//! # use object_store::local::LocalFileSystem;
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(LocalFileSystem::new())
+//! # }
+//! # async fn multi_upload() {
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
+//!
+//! let bytes = Bytes::from_static(b"hello");
+//! writer.write_all(&bytes).await.unwrap();
+//! writer.flush().await.unwrap();
+//! writer.shutdown().await.unwrap();
//! # }
//! ```
//!
-//! # Multipart put object
-//! Use the [`ObjectStore::put_multipart`] method to save large amount of data
in chunks.
+//! # Vectored Read
+//!
+//! A common pattern, especially when reading structured datasets, is to need
to fetch
+//! multiple ranges of a particular object.
Review Comment:
```suggestion
//! A common pattern, especially when reading structured datasets, is to
fetch
//! multiple, potentially discontiguous, ranges of a particular object.
```
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
Review Comment:
```suggestion
//! /// Example cache that returns a cached version of data from a remote
//! /// ObjectStore, checking every 10 seconds for a new version
//! struct Cache {
```
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
Review Comment:
```suggestion
//! struct CacheEntry {
//! /// Data returned by last request
//! data: Bytes,
//! /// Entity tag, provided by the server, identifying the object
//! e_tag: String,
//! /// Time of last refresh
//! refreshed_at: Instant,
//! }
```
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//! entries: HashMap<Path, CacheEntry>,
+//! store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//! Ok(match self.entries.get_mut(path) {
+//! Some(e) => match e.refreshed_at.elapsed() <
Duration::from_secs(10) {
+//! true => e.data.clone(), // Return cached data
+//! false => {
+//! let opts = GetOptions {
+//! if_none_match: Some(e.e_tag.clone()),
+//! ..GetOptions::default()
+//! };
+//! match self.store.get_opts(&path, opts).await {
+//! Ok(d) => e.data = d.bytes().await?,
+//! Err(Error::NotModified { .. }) => {} // Data has
not changed
+//! Err(e) => return Err(e),
+//! };
+//! e.refreshed_at = Instant::now();
+//! e.data.clone()
+//! }
+//! },
+//! None => {
Review Comment:
```suggestion
//! None => { // not cached, fetch value
```
##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
//! # }
//! # async fn multi_upload() {
//! #
-//! let object_store: Arc<dyn ObjectStore> = get_object_store();
-//! let path: Path = "data/large_file".try_into().unwrap();
-//! let (_id, mut writer) = object_store.put_multipart(&path).await.unwrap();
-//!
-//! let bytes = Bytes::from_static(b"hello");
-//! writer.write_all(&bytes).await.unwrap();
-//! writer.flush().await.unwrap();
-//! writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600,
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//! data: Bytes,
+//! e_tag: String,
+//! refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//! entries: HashMap<Path, CacheEntry>,
+//! store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//! Ok(match self.entries.get_mut(path) {
+//! Some(e) => match e.refreshed_at.elapsed() <
Duration::from_secs(10) {
+//! true => e.data.clone(), // Return cached data
+//! false => {
+//! let opts = GetOptions {
+//! if_none_match: Some(e.e_tag.clone()),
+//! ..GetOptions::default()
+//! };
+//! match self.store.get_opts(&path, opts).await {
+//! Ok(d) => e.data = d.bytes().await?,
+//! Err(Error::NotModified { .. }) => {} // Data has
not changed
+//! Err(e) => return Err(e),
+//! };
+//! e.refreshed_at = Instant::now();
+//! e.data.clone()
+//! }
+//! },
+//! None => {
+//! let get = self.store.get(&path).await?;
+//! let e_tag = get.meta.e_tag.clone();
+//! let data = get.bytes().await?;
+//! if let Some(e_tag) = e_tag {
+//! let entry = CacheEntry {
+//! e_tag,
+//! data: data.clone(),
+//! refreshed_at: Instant::now(),
+//! };
+//! self.entries.insert(path.clone(), entry);
+//! }
+//! data
+//! }
+//! })
+//! }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object
at the given path.
+//! More complex behaviours can be achieved using [`PutMode`], and can be used
to build
+//! [Optimistic Concurrency Control] based transactions. This facilitates
building metadata catalogs,
+//! such as [Apache Iceberg] or [Delta Lake], directly on top of object
storage, without relying on
+//! a separate DBMS.
+//!
+//! ```
+//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::memory::InMemory;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! # Arc::new(InMemory::new())
+//! # }
+//! # fn do_update(b: Bytes) -> Bytes {b}
+//! # async fn conditional_put() {
+//! let store = get_object_store();
+//! let path = Path::from("test");
+//! loop {
+//! // Perform get request
+//! let r = store.get(&path).await.unwrap();
+//!
+//! // Save version information fetched
+//! let version = UpdateVersion {
+//! e_tag: r.meta.e_tag.clone(),
+//! version: r.meta.version.clone(),
+//! };
+//!
+//! // Compute new version of object contents
+//! let new = do_update(r.bytes().await.unwrap());
+//!
+//! // Attempt to commit transaction
+//! match store.put_opts(&path, new,
PutMode::Update(version).into()).await {
+//! Ok(_) => break, // Successfully committed
+//! Err(Error::Precondition { .. }) => continue, // Transaction
conflict, try again
Review Comment:
```suggestion
//! Err(Error::Precondition { .. }) => continue, // object has been
changed, try again
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]