alamb commented on code in PR #4978:
URL: https://github.com/apache/arrow-rs/pull/4978#discussion_r1374994374


##########
object_store/src/lib.rs:
##########
@@ -38,20 +38,27 @@
 //!
 //! # Highlights
 //!
-//! 1. A focused, easy to use, idiomatic, well documented, high
-//! performance, `async` API.
+//! 1. A high-performance async API focused on providing a consistent interface
+//! mirroring that of object stores such as [S3]
 //!
 //! 2. Production quality, leading this crate to be used in large
-//! scale production systems, such as [crates.io] and [InfluxDB IOx].
+//! scale production systems, such as [crates.io] and [InfluxDB IOx]
 //!
-//! 3. Stable and predictable governance via the [Apache Arrow] project.
+//! 3. Support for advanced functionality, including [ACID] reads

Review Comment:
   I suggest you don't use ACID here as it is a database term and not as 
commonly applied to object stores in my experience. 
   
   For example, I think the claim that the D in ACID for **D**urable  counts as 
"advanced" functionality for object stores / their APIs is confusing. It is 
also unclear to me what this crates adds for `Isolation` in the classic 
database sense of the word. 
   
   Perhaps a better term would be "atomic reads/writes" which also highlights 
the feature I think you are hinting at



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//!     entries: HashMap<Path, CacheEntry>,
+//!     store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//!         Ok(match self.entries.get_mut(path) {
+//!             Some(e) => match e.refreshed_at.elapsed() < 
Duration::from_secs(10) {
+//!                 true => e.data.clone(), // Return cached data
+//!                 false => {
+//!                     let opts = GetOptions {
+//!                         if_none_match: Some(e.e_tag.clone()),
+//!                         ..GetOptions::default()
+//!                     };
+//!                     match self.store.get_opts(&path, opts).await {
+//!                         Ok(d) => e.data = d.bytes().await?,
+//!                         Err(Error::NotModified { .. }) => {} // Data has 
not changed
+//!                         Err(e) => return Err(e),
+//!                     };
+//!                     e.refreshed_at = Instant::now();
+//!                     e.data.clone()
+//!                 }
+//!             },
+//!             None => {
+//!                 let get = self.store.get(&path).await?;
+//!                 let e_tag = get.meta.e_tag.clone();
+//!                 let data = get.bytes().await?;
+//!                 if let Some(e_tag) = e_tag {
+//!                     let entry = CacheEntry {
+//!                         e_tag,
+//!                         data: data.clone(),
+//!                         refreshed_at: Instant::now(),
+//!                     };
+//!                     self.entries.insert(path.clone(), entry);
+//!                 }
+//!                 data
+//!             }
+//!         })
+//!     }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object 
at the given path.
+//! More complex behaviours can be achieved using [`PutMode`], and can be used 
to build
+//! [Optimistic Concurrency Control] based transactions. This facilitates 
building metadata catalogs,
+//! such as [Apache Iceberg] or [Delta Lake], directly on top of object 
storage, without relying on
+//! a separate DBMS.
+//!
+//! ```
+//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::memory::InMemory;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(InMemory::new())
+//! # }
+//! # fn do_update(b: Bytes) -> Bytes {b}
+//! # async fn conditional_put() {

Review Comment:
   It might be helpful to have this name exposed
   
   ```suggestion
   //! async fn conditional_put() {
   ```



##########
object_store/src/lib.rs:
##########
@@ -79,6 +86,22 @@
     doc = "* [`http`]: [HTTP/WebDAV 
Storage](https://datatracker.ietf.org/doc/html/rfc2518). See 
[`HttpBuilder`](http::HttpBuilder)"
 )]
 //!
+//! # Why not a Filesystem Interface?
+//!
+//! Whilst this crate does provide a [`BufReader`], the [`ObjectStore`] 
interface mirrors the APIs
+//! of object stores and not filesystems.

Review Comment:
   I think it might help to give an example of what a filesystem API means. 
Perhaps something like
   
   ```suggestion
   //! of object stores and not filesystems. For example, it purposely does not 
offer a `Seek` like API.
   ```



##########
object_store/src/lib.rs:
##########
@@ -38,20 +38,27 @@
 //!
 //! # Highlights
 //!
-//! 1. A focused, easy to use, idiomatic, well documented, high
-//! performance, `async` API.
+//! 1. A high-performance async API focused on providing a consistent interface
+//! mirroring that of object stores such as [S3]
 //!
 //! 2. Production quality, leading this crate to be used in large
-//! scale production systems, such as [crates.io] and [InfluxDB IOx].
+//! scale production systems, such as [crates.io] and [InfluxDB IOx]
 //!
-//! 3. Stable and predictable governance via the [Apache Arrow] project.
+//! 3. Support for advanced functionality, including [ACID] reads
+//! and writes, vectored IO, bulk deletion, and more...
+//!
+//! 4. Stable and predictable governance via the [Apache Arrow] project
+//!
+//! 5. Very low dependency footprint

Review Comment:
   ```suggestion
   //! 5. Small dependency footprint (depends on only a small number of common 
crates)
   ```



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//!     entries: HashMap<Path, CacheEntry>,
+//!     store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//!         Ok(match self.entries.get_mut(path) {
+//!             Some(e) => match e.refreshed_at.elapsed() < 
Duration::from_secs(10) {
+//!                 true => e.data.clone(), // Return cached data
+//!                 false => {
+//!                     let opts = GetOptions {
+//!                         if_none_match: Some(e.e_tag.clone()),
+//!                         ..GetOptions::default()
+//!                     };
+//!                     match self.store.get_opts(&path, opts).await {
+//!                         Ok(d) => e.data = d.bytes().await?,
+//!                         Err(Error::NotModified { .. }) => {} // Data has 
not changed
+//!                         Err(e) => return Err(e),
+//!                     };
+//!                     e.refreshed_at = Instant::now();
+//!                     e.data.clone()
+//!                 }
+//!             },
+//!             None => {
+//!                 let get = self.store.get(&path).await?;
+//!                 let e_tag = get.meta.e_tag.clone();
+//!                 let data = get.bytes().await?;
+//!                 if let Some(e_tag) = e_tag {
+//!                     let entry = CacheEntry {
+//!                         e_tag,
+//!                         data: data.clone(),
+//!                         refreshed_at: Instant::now(),
+//!                     };
+//!                     self.entries.insert(path.clone(), entry);
+//!                 }
+//!                 data
+//!             }
+//!         })
+//!     }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object 
at the given path.

Review Comment:
   ```suggestion
   //! The default behaviour when writing data is to upsert any existing object 
at the given path, overwriting any previous values.
   ```



##########
object_store/src/lib.rs:
##########
@@ -141,25 +199,34 @@
 //! # use futures::TryStreamExt;
 //! # use object_store::local::LocalFileSystem;
 //! # use std::sync::Arc;
-//! # use object_store::{path::Path, ObjectStore};
+//! #  use bytes::Bytes;
+//! # use object_store::{path::Path, ObjectStore, GetResult};
 //! # fn get_object_store() -> Arc<dyn ObjectStore> {
 //! #   Arc::new(LocalFileSystem::new())
 //! # }
 //! #
 //! # async fn example() {
 //! #
-//! // create an ObjectStore
+//! // Create an ObjectStore
 //! let object_store: Arc<dyn ObjectStore> = get_object_store();
 //!
 //! // Retrieve a specific file
-//! let path: Path = "data/file01.parquet".try_into().unwrap();
+//! let path = Path::from("data/file01.parquet");

Review Comment:
   I find the shadowing of `Path` with `std::fs::path::Path` always a bit 
confusing. Maybe we could make the `use object_store::Path` unhidden to make 
this clearer



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//!     entries: HashMap<Path, CacheEntry>,
+//!     store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//!         Ok(match self.entries.get_mut(path) {
+//!             Some(e) => match e.refreshed_at.elapsed() < 
Duration::from_secs(10) {
+//!                 true => e.data.clone(), // Return cached data
+//!                 false => {

Review Comment:
   ```suggestion
   //!                 false => { // check if remote version has changed
   ```



##########
object_store/src/lib.rs:
##########
@@ -190,15 +253,48 @@
 //! # }
 //! # async fn put() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/file1".try_into().unwrap();
-//!  let bytes = Bytes::from_static(b"hello");
-//!  object_store.put(&path, bytes).await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/file1");
+//! let bytes = Bytes::from_static(b"hello");
+//! object_store.put(&path, bytes).await.unwrap();
+//! # }
+//! ```
+//!
+//! #  Multipart Upload
+//!
+//! Use the [`ObjectStore::put_multipart`] method to atomically write a large 
amount of data,
+//! with implementations automatically handling parallel, chunked upload where 
appropriate.
+//!
+//! ```
+//! # use object_store::local::LocalFileSystem;
+//! # use object_store::ObjectStore;
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(LocalFileSystem::new())
+//! # }
+//! # async fn multi_upload() {
+//! #
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
+//!
+//! let bytes = Bytes::from_static(b"hello");
+//! writer.write_all(&bytes).await.unwrap();
+//! writer.flush().await.unwrap();
+//! writer.shutdown().await.unwrap();
 //! # }
 //! ```
 //!
-//! #  Multipart put object
-//! Use the [`ObjectStore::put_multipart`] method to save large amount of data 
in chunks.
+//! # Vectored Read
+//!
+//! A common pattern, especially when reading structured datasets, is to need 
to fetch
+//! multiple ranges of a particular object.

Review Comment:
   ```suggestion
   //! A common pattern, especially when reading structured datasets, is to 
fetch
   //! multiple, potentially discontiguous, ranges of a particular object.
   ```



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {

Review Comment:
   ```suggestion
   //!  /// Example cache that returns a cached version of data from a remote
   //!  /// ObjectStore, checking every 10 seconds for a new version
   //! struct Cache {
   ```



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }

Review Comment:
   ```suggestion
   //! struct CacheEntry {
   //!     /// Data returned by last request
   //!     data: Bytes,
   //!     /// Entity tag, provided by the server, identifying the object
   //!     e_tag: String,
   //!     /// Time of last refresh
   //!     refreshed_at: Instant,
   //! }
   ```



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//!     entries: HashMap<Path, CacheEntry>,
+//!     store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//!         Ok(match self.entries.get_mut(path) {
+//!             Some(e) => match e.refreshed_at.elapsed() < 
Duration::from_secs(10) {
+//!                 true => e.data.clone(), // Return cached data
+//!                 false => {
+//!                     let opts = GetOptions {
+//!                         if_none_match: Some(e.e_tag.clone()),
+//!                         ..GetOptions::default()
+//!                     };
+//!                     match self.store.get_opts(&path, opts).await {
+//!                         Ok(d) => e.data = d.bytes().await?,
+//!                         Err(Error::NotModified { .. }) => {} // Data has 
not changed
+//!                         Err(e) => return Err(e),
+//!                     };
+//!                     e.refreshed_at = Instant::now();
+//!                     e.data.clone()
+//!                 }
+//!             },
+//!             None => {

Review Comment:
   ```suggestion
   //!             None => { // not cached, fetch value
   ```



##########
object_store/src/lib.rs:
##########
@@ -212,16 +308,128 @@
 //! # }
 //! # async fn multi_upload() {
 //! #
-//!  let object_store: Arc<dyn ObjectStore> = get_object_store();
-//!  let path: Path = "data/large_file".try_into().unwrap();
-//!  let (_id, mut writer) =  object_store.put_multipart(&path).await.unwrap();
-//!
-//!  let bytes = Bytes::from_static(b"hello");
-//!  writer.write_all(&bytes).await.unwrap();
-//!  writer.flush().await.unwrap();
-//!  writer.shutdown().await.unwrap();
+//! let object_store: Arc<dyn ObjectStore> = get_object_store();
+//! let path = Path::from("data/large_file");
+//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 
0..10]).await.unwrap();
+//! assert_eq!(ranges.len(), 3);
+//! assert_eq!(ranges[0].len(), 10);
+//! # }
+//! ```
+//!
+//! # Conditional Fetch
+//!
+//! More complex object retrieval can be supported by 
[`ObjectStore::get_opts`].
+//!
+//! For example, efficiently refreshing a cache without re-fetching the entire 
object
+//! data if the object hasn't been modified.
+//!
+//! ```
+//! # use std::collections::btree_map::Entry;
+//! # use std::collections::HashMap;
+//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
+//! # use std::sync::Arc;
+//! # use std::time::{Duration, Instant};
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::path::Path;
+//! struct CacheEntry {
+//!     data: Bytes,
+//!     e_tag: String,
+//!     refreshed_at: Instant,
+//! }
+//!
+//! struct Cache {
+//!     entries: HashMap<Path, CacheEntry>,
+//!     store: Arc<dyn ObjectStore>,
+//! }
+//!
+//! impl Cache {
+//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
+//!         Ok(match self.entries.get_mut(path) {
+//!             Some(e) => match e.refreshed_at.elapsed() < 
Duration::from_secs(10) {
+//!                 true => e.data.clone(), // Return cached data
+//!                 false => {
+//!                     let opts = GetOptions {
+//!                         if_none_match: Some(e.e_tag.clone()),
+//!                         ..GetOptions::default()
+//!                     };
+//!                     match self.store.get_opts(&path, opts).await {
+//!                         Ok(d) => e.data = d.bytes().await?,
+//!                         Err(Error::NotModified { .. }) => {} // Data has 
not changed
+//!                         Err(e) => return Err(e),
+//!                     };
+//!                     e.refreshed_at = Instant::now();
+//!                     e.data.clone()
+//!                 }
+//!             },
+//!             None => {
+//!                 let get = self.store.get(&path).await?;
+//!                 let e_tag = get.meta.e_tag.clone();
+//!                 let data = get.bytes().await?;
+//!                 if let Some(e_tag) = e_tag {
+//!                     let entry = CacheEntry {
+//!                         e_tag,
+//!                         data: data.clone(),
+//!                         refreshed_at: Instant::now(),
+//!                     };
+//!                     self.entries.insert(path.clone(), entry);
+//!                 }
+//!                 data
+//!             }
+//!         })
+//!     }
+//! }
+//! ```
+//!
+//! # Conditional Put
+//!
+//! The default behaviour when writing data is to upsert any existing object 
at the given path.
+//! More complex behaviours can be achieved using [`PutMode`], and can be used 
to build
+//! [Optimistic Concurrency Control] based transactions. This facilitates 
building metadata catalogs,
+//! such as [Apache Iceberg] or [Delta Lake], directly on top of object 
storage, without relying on
+//! a separate DBMS.
+//!
+//! ```
+//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
+//! # use std::sync::Arc;
+//! # use bytes::Bytes;
+//! # use tokio::io::AsyncWriteExt;
+//! # use object_store::memory::InMemory;
+//! # use object_store::path::Path;
+//! # fn get_object_store() -> Arc<dyn ObjectStore> {
+//! #   Arc::new(InMemory::new())
+//! # }
+//! # fn do_update(b: Bytes) -> Bytes {b}
+//! # async fn conditional_put() {
+//! let store = get_object_store();
+//! let path = Path::from("test");
+//! loop {
+//!     // Perform get request
+//!     let r = store.get(&path).await.unwrap();
+//!
+//!     // Save version information fetched
+//!     let version = UpdateVersion {
+//!         e_tag: r.meta.e_tag.clone(),
+//!         version: r.meta.version.clone(),
+//!     };
+//!
+//!     // Compute new version of object contents
+//!     let new = do_update(r.bytes().await.unwrap());
+//!
+//!     // Attempt to commit transaction
+//!     match store.put_opts(&path, new, 
PutMode::Update(version).into()).await {
+//!         Ok(_) => break, // Successfully committed
+//!         Err(Error::Precondition { .. }) => continue, // Transaction 
conflict, try again

Review Comment:
   ```suggestion
   //!         Err(Error::Precondition { .. }) => continue, // object has been 
changed, try again
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to