This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 520ad68beec Export object_store integration tests (#5709)
520ad68beec is described below

commit 520ad68beeceb8888e926d43218a851c4ef65ce3
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sat May 4 22:13:25 2024 +0100

    Export object_store integration tests (#5709)
    
    * Export object_store integration tests
    
    * Clippy
    
    * Clippy
    
    * Even more clippy
    
    * Format
---
 object_store/Cargo.toml            |    1 +
 object_store/src/aws/builder.rs    |   12 -
 object_store/src/aws/client.rs     |    6 -
 object_store/src/aws/dynamo.rs     |   12 -
 object_store/src/aws/mod.rs        |   18 +-
 object_store/src/azure/builder.rs  |    5 -
 object_store/src/azure/client.rs   |   11 -
 object_store/src/azure/mod.rs      |    3 +-
 object_store/src/chunked.rs        |    2 +-
 object_store/src/client/list.rs    |    1 +
 object_store/src/gcp/builder.rs    |    5 -
 object_store/src/gcp/client.rs     |    6 -
 object_store/src/gcp/credential.rs |    2 +-
 object_store/src/gcp/mod.rs        |   15 +-
 object_store/src/http/mod.rs       |    6 +-
 object_store/src/integration.rs    | 1082 ++++++++++++++++++++++++++++++++++++
 object_store/src/lib.rs            | 1082 +-----------------------------------
 object_store/src/limit.rs          |    2 +-
 object_store/src/local.rs          |    3 +-
 object_store/src/memory.rs         |    2 +-
 object_store/src/parse.rs          |    6 -
 object_store/src/prefix.rs         |    3 +-
 object_store/src/throttle.rs       |    2 +-
 23 files changed, 1130 insertions(+), 1157 deletions(-)

diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index ca1e5b32703..c61946f7f83 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -66,6 +66,7 @@ gcp = ["cloud", "rustls-pemfile"]
 aws = ["cloud", "md-5"]
 http = ["cloud"]
 tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
+integration = []
 
 [dev-dependencies] # In alphabetical order
 futures-test = "0.3"
diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index 664e1836460..1e42093fd8a 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -70,21 +70,9 @@ enum Error {
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
 
-    #[snafu(display("Bucket '{}' not found", bucket))]
-    BucketNotFound { bucket: String },
-
-    #[snafu(display("Failed to resolve region for bucket '{}'", bucket))]
-    ResolveRegion {
-        bucket: String,
-        source: reqwest::Error,
-    },
-
     #[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))]
     ZoneSuffix { bucket: String },
 
-    #[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
-    RegionParse { bucket: String },
-
     #[snafu(display("Invalid encryption type: {}. Valid values are \"AES256\", 
\"sse:kms\", and \"sse:kms:dsse\".", passed))]
     InvalidEncryptionType { passed: String },
 
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 24247688e86..98226c4b6b1 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -66,12 +66,6 @@ const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
 pub(crate) enum Error {
-    #[snafu(display("Error fetching get response body {}: {}", path, source))]
-    GetResponseBody {
-        source: reqwest::Error,
-        path: String,
-    },
-
     #[snafu(display("Error performing DeleteObjects request: {}", source))]
     DeleteObjectsRequest { source: crate::client::retry::Error },
 
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
index 2390187e7f7..9de67e5aaa4 100644
--- a/object_store/src/aws/dynamo.rs
+++ b/object_store/src/aws/dynamo.rs
@@ -451,18 +451,6 @@ struct PutItem<'a> {
     return_values_on_condition_check_failure: Option<ReturnValues>,
 }
 
-/// A DynamoDB [GetItem] payload
-///
-/// [GetItem]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html
-#[derive(Serialize)]
-#[serde(rename_all = "PascalCase")]
-struct GetItem<'a> {
-    /// The table name
-    table_name: &'a str,
-    /// The primary key
-    key: Map<'a, &'a str, AttributeValue<'a>>,
-}
-
 #[derive(Deserialize)]
 struct ErrorResponse<'a> {
     #[serde(rename = "__type")]
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 5bc6d56e7c7..f5204a5365e 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -409,14 +409,16 @@ impl MultipartStore for AmazonS3 {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{client::get::GetClient, tests::*};
+    use crate::client::get::GetClient;
+    use crate::integration::*;
+    use crate::tests::*;
     use hyper::HeaderMap;
 
     const NON_EXISTENT_NAME: &str = "nonexistentname";
 
     #[tokio::test]
     async fn s3_test() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = AmazonS3Builder::from_env();
 
         let integration = config.build().unwrap();
@@ -475,7 +477,7 @@ mod tests {
 
     #[tokio::test]
     async fn s3_test_get_nonexistent_location() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = AmazonS3Builder::from_env().build().unwrap();
 
         let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -488,7 +490,7 @@ mod tests {
 
     #[tokio::test]
     async fn s3_test_get_nonexistent_bucket() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = 
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
         let integration = config.build().unwrap();
 
@@ -500,7 +502,7 @@ mod tests {
 
     #[tokio::test]
     async fn s3_test_put_nonexistent_bucket() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = 
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
         let integration = config.build().unwrap();
 
@@ -513,7 +515,7 @@ mod tests {
 
     #[tokio::test]
     async fn s3_test_delete_nonexistent_location() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = AmazonS3Builder::from_env().build().unwrap();
 
         let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -523,7 +525,7 @@ mod tests {
 
     #[tokio::test]
     async fn s3_test_delete_nonexistent_bucket() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = 
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
         let integration = config.build().unwrap();
 
@@ -560,7 +562,7 @@ mod tests {
     }
 
     async fn s3_encryption(store: &AmazonS3) {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
 
         let data = PutPayload::from(vec![3u8; 1024]);
 
diff --git a/object_store/src/azure/builder.rs 
b/object_store/src/azure/builder.rs
index ee0953456ad..c0c4e8983a0 100644
--- a/object_store/src/azure/builder.rs
+++ b/object_store/src/azure/builder.rs
@@ -89,11 +89,6 @@ enum Error {
 
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
-
-    #[snafu(display("Unable to extract metadata from headers: {}", source))]
-    Metadata {
-        source: crate::client::header::Error,
-    },
 }
 
 impl From<Error> for crate::Error {
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 311bd72ff52..be760c7dd36 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -67,12 +67,6 @@ pub(crate) enum Error {
         path: String,
     },
 
-    #[snafu(display("Error getting get response body {}: {}", path, source))]
-    GetResponseBody {
-        source: reqwest::Error,
-        path: String,
-    },
-
     #[snafu(display("Error performing put request {}: {}", path, source))]
     PutRequest {
         source: crate::client::retry::Error,
@@ -94,11 +88,6 @@ pub(crate) enum Error {
     #[snafu(display("Got invalid list response: {}", source))]
     InvalidListResponse { source: quick_xml::de::DeError },
 
-    #[snafu(display("Error authorizing request: {}", source))]
-    Authorization {
-        source: crate::azure::credential::Error,
-    },
-
     #[snafu(display("Unable to extract metadata from headers: {}", source))]
     Metadata {
         source: crate::client::header::Error,
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 755f3b1265f..f89a184f952 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -276,12 +276,13 @@ impl MultipartStore for MicrosoftAzure {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::integration::*;
     use crate::tests::*;
     use bytes::Bytes;
 
     #[tokio::test]
     async fn azure_blob_test() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
 
         put_get_delete_list(&integration).await;
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index a3bd7626787..98cc2049801 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -178,10 +178,10 @@ impl ObjectStore for ChunkedStore {
 mod tests {
     use futures::StreamExt;
 
+    use crate::integration::*;
     use crate::local::LocalFileSystem;
     use crate::memory::InMemory;
     use crate::path::Path;
-    use crate::tests::*;
 
     use super::*;
 
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
index 371894dfeb7..2dbe20f3f51 100644
--- a/object_store/src/client/list.rs
+++ b/object_store/src/client/list.rs
@@ -48,6 +48,7 @@ pub trait ListClientExt {
 
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
 
+    #[allow(unused)]
     fn list_with_offset(
         &self,
         prefix: Option<&Path>,
diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs
index e6da312072c..82dab14437d 100644
--- a/object_store/src/gcp/builder.rs
+++ b/object_store/src/gcp/builder.rs
@@ -60,11 +60,6 @@ enum Error {
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
 
-    #[snafu(display("Unable to extract metadata from headers: {}", source))]
-    Metadata {
-        source: crate::client::header::Error,
-    },
-
     #[snafu(display("GCP credential error: {}", source))]
     Credential { source: credential::Error },
 }
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index a5493256546..35c64cc3d2c 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -81,12 +81,6 @@ enum Error {
     #[snafu(display("Got invalid put response: {}", source))]
     InvalidPutResponse { source: quick_xml::de::DeError },
 
-    #[snafu(display("Error performing post request {}: {}", path, source))]
-    PostRequest {
-        source: crate::client::retry::Error,
-        path: String,
-    },
-
     #[snafu(display("Unable to extract metadata from headers: {}", source))]
     Metadata {
         source: crate::client::header::Error,
diff --git a/object_store/src/gcp/credential.rs 
b/object_store/src/gcp/credential.rs
index d7fc2cea2c7..829db9b1aec 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -536,7 +536,7 @@ impl ApplicationDefaultCredentials {
             let path = Path::new(&home).join(Self::CREDENTIALS_PATH);
 
             // It's expected for this file to not exist unless it has been 
explicitly configured by the user.
-            if path.try_exists().unwrap_or(false) {
+            if path.exists() {
                 return read_credentials_file::<Self>(path).map(Some);
             }
         }
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 0ec6e7e8264..039ec46b68c 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -273,6 +273,7 @@ mod test {
 
     use credential::DEFAULT_GCS_BASE_URL;
 
+    use crate::integration::*;
     use crate::tests::*;
 
     use super::*;
@@ -281,7 +282,7 @@ mod test {
 
     #[tokio::test]
     async fn gcs_test() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = 
GoogleCloudStorageBuilder::from_env().build().unwrap();
 
         put_get_delete_list(&integration).await;
@@ -307,7 +308,7 @@ mod test {
     #[tokio::test]
     #[ignore]
     async fn gcs_test_sign() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = 
GoogleCloudStorageBuilder::from_env().build().unwrap();
 
         let client = reqwest::Client::new();
@@ -336,7 +337,7 @@ mod test {
 
     #[tokio::test]
     async fn gcs_test_get_nonexistent_location() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = 
GoogleCloudStorageBuilder::from_env().build().unwrap();
 
         let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -351,7 +352,7 @@ mod test {
 
     #[tokio::test]
     async fn gcs_test_get_nonexistent_bucket() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = GoogleCloudStorageBuilder::from_env();
         let integration = 
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
 
@@ -369,7 +370,7 @@ mod test {
 
     #[tokio::test]
     async fn gcs_test_delete_nonexistent_location() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let integration = 
GoogleCloudStorageBuilder::from_env().build().unwrap();
 
         let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -383,7 +384,7 @@ mod test {
 
     #[tokio::test]
     async fn gcs_test_delete_nonexistent_bucket() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = GoogleCloudStorageBuilder::from_env();
         let integration = 
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
 
@@ -398,7 +399,7 @@ mod test {
 
     #[tokio::test]
     async fn gcs_test_put_nonexistent_bucket() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let config = GoogleCloudStorageBuilder::from_env();
         let integration = 
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
 
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 404211e578d..4b1c927e74f 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -64,9 +64,6 @@ enum Error {
     Metadata {
         source: crate::client::header::Error,
     },
-
-    #[snafu(display("Request error: {}", source))]
-    Reqwest { source: reqwest::Error },
 }
 
 impl From<Error> for crate::Error {
@@ -249,13 +246,14 @@ impl HttpBuilder {
 
 #[cfg(test)]
 mod tests {
+    use crate::integration::*;
     use crate::tests::*;
 
     use super::*;
 
     #[tokio::test]
     async fn http_test() {
-        crate::test_util::maybe_skip_integration!();
+        maybe_skip_integration!();
         let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
         let options = ClientOptions::new().with_allow_http(true);
         let integration = HttpBuilder::new()
diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs
new file mode 100644
index 00000000000..9a7d117158c
--- /dev/null
+++ b/object_store/src/integration.rs
@@ -0,0 +1,1082 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for custom object store implementations
+//!
+//! NB: These tests will delete everything present in the provided 
[`DynObjectStore`].
+//!
+//! These tests are not a stable part of the public API and breaking changes 
may be made
+//! in patch releases.
+//!
+//! They are intended solely for testing purposes.
+
+use crate::multipart::MultipartStore;
+use crate::path::Path;
+use crate::{
+    Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, 
ObjectStore, PutMode,
+    PutPayload, UpdateVersion, WriteMultipart,
+};
+use bytes::Bytes;
+use futures::stream::FuturesUnordered;
+use futures::{StreamExt, TryStreamExt};
+use rand::distributions::Alphanumeric;
+use rand::{thread_rng, Rng};
+
+pub(crate) async fn flatten_list_stream(
+    storage: &DynObjectStore,
+    prefix: Option<&Path>,
+) -> crate::Result<Vec<Path>> {
+    storage
+        .list(prefix)
+        .map_ok(|meta| meta.location)
+        .try_collect::<Vec<Path>>()
+        .await
+}
+
+/// Tests basic read/write and listing operations
+pub async fn put_get_delete_list(storage: &DynObjectStore) {
+    delete_fixtures(storage).await;
+
+    let content_list = flatten_list_stream(storage, None).await.unwrap();
+    assert!(
+        content_list.is_empty(),
+        "Expected list to be empty; found: {content_list:?}"
+    );
+
+    let location = Path::from("test_dir/test_file.json");
+
+    let data = Bytes::from("arbitrary data");
+    storage.put(&location, data.clone().into()).await.unwrap();
+
+    let root = Path::from("/");
+
+    // List everything
+    let content_list = flatten_list_stream(storage, None).await.unwrap();
+    assert_eq!(content_list, &[location.clone()]);
+
+    // Should behave the same as no prefix
+    let content_list = flatten_list_stream(storage, 
Some(&root)).await.unwrap();
+    assert_eq!(content_list, &[location.clone()]);
+
+    // List with delimiter
+    let result = storage.list_with_delimiter(None).await.unwrap();
+    assert_eq!(&result.objects, &[]);
+    assert_eq!(result.common_prefixes.len(), 1);
+    assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+    // Should behave the same as no prefix
+    let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
+    assert!(result.objects.is_empty());
+    assert_eq!(result.common_prefixes.len(), 1);
+    assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+    // Should return not found
+    let err = storage.get(&Path::from("test_dir")).await.unwrap_err();
+    assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+    // Should return not found
+    let err = storage.head(&Path::from("test_dir")).await.unwrap_err();
+    assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+    // List everything starting with a prefix that should return results
+    let prefix = Path::from("test_dir");
+    let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
+    assert_eq!(content_list, &[location.clone()]);
+
+    // List everything starting with a prefix that shouldn't return results
+    let prefix = Path::from("something");
+    let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
+    assert!(content_list.is_empty());
+
+    let read_data = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
+    assert_eq!(&*read_data, data);
+
+    // Test range request
+    let range = 3..7;
+    let range_result = storage.get_range(&location, range.clone()).await;
+
+    let bytes = range_result.unwrap();
+    assert_eq!(bytes, data.slice(range.clone()));
+
+    let opts = GetOptions {
+        range: Some(GetRange::Bounded(2..5)),
+        ..Default::default()
+    };
+    let result = storage.get_opts(&location, opts).await.unwrap();
+    // Data is `"arbitrary data"`, length 14 bytes
+    assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
+    assert_eq!(result.range, 2..5);
+    let bytes = result.bytes().await.unwrap();
+    assert_eq!(bytes, b"bit".as_ref());
+
+    let out_of_range = 200..300;
+    let out_of_range_result = storage.get_range(&location, out_of_range).await;
+
+    // Should be a non-fatal error
+    out_of_range_result.unwrap_err();
+
+    let opts = GetOptions {
+        range: Some(GetRange::Bounded(2..100)),
+        ..Default::default()
+    };
+    let result = storage.get_opts(&location, opts).await.unwrap();
+    assert_eq!(result.range, 2..14);
+    assert_eq!(result.meta.size, 14);
+    let bytes = result.bytes().await.unwrap();
+    assert_eq!(bytes, b"bitrary data".as_ref());
+
+    let opts = GetOptions {
+        range: Some(GetRange::Suffix(2)),
+        ..Default::default()
+    };
+    match storage.get_opts(&location, opts).await {
+        Ok(result) => {
+            assert_eq!(result.range, 12..14);
+            assert_eq!(result.meta.size, 14);
+            let bytes = result.bytes().await.unwrap();
+            assert_eq!(bytes, b"ta".as_ref());
+        }
+        Err(Error::NotSupported { .. }) => {}
+        Err(e) => panic!("{e}"),
+    }
+
+    let opts = GetOptions {
+        range: Some(GetRange::Suffix(100)),
+        ..Default::default()
+    };
+    match storage.get_opts(&location, opts).await {
+        Ok(result) => {
+            assert_eq!(result.range, 0..14);
+            assert_eq!(result.meta.size, 14);
+            let bytes = result.bytes().await.unwrap();
+            assert_eq!(bytes, b"arbitrary data".as_ref());
+        }
+        Err(Error::NotSupported { .. }) => {}
+        Err(e) => panic!("{e}"),
+    }
+
+    let opts = GetOptions {
+        range: Some(GetRange::Offset(3)),
+        ..Default::default()
+    };
+    let result = storage.get_opts(&location, opts).await.unwrap();
+    assert_eq!(result.range, 3..14);
+    assert_eq!(result.meta.size, 14);
+    let bytes = result.bytes().await.unwrap();
+    assert_eq!(bytes, b"itrary data".as_ref());
+
+    let opts = GetOptions {
+        range: Some(GetRange::Offset(100)),
+        ..Default::default()
+    };
+    storage.get_opts(&location, opts).await.unwrap_err();
+
+    let ranges = vec![0..1, 2..3, 0..5];
+    let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
+    for (range, bytes) in ranges.iter().zip(bytes) {
+        assert_eq!(bytes, data.slice(range.clone()))
+    }
+
+    let head = storage.head(&location).await.unwrap();
+    assert_eq!(head.size, data.len());
+
+    storage.delete(&location).await.unwrap();
+
+    let content_list = flatten_list_stream(storage, None).await.unwrap();
+    assert!(content_list.is_empty());
+
+    let err = storage.get(&location).await.unwrap_err();
+    assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+    let err = storage.head(&location).await.unwrap_err();
+    assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+    // Test handling of paths containing an encoded delimiter
+
+    let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
+    storage
+        .put(&file_with_delimiter, "arbitrary".into())
+        .await
+        .unwrap();
+
+    let files = flatten_list_stream(storage, None).await.unwrap();
+    assert_eq!(files, vec![file_with_delimiter.clone()]);
+
+    let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
+        .await
+        .unwrap();
+    assert!(files.is_empty());
+
+    let files = storage
+        .list_with_delimiter(Some(&Path::from("a/b")))
+        .await
+        .unwrap();
+    assert!(files.common_prefixes.is_empty());
+    assert!(files.objects.is_empty());
+
+    let files = storage
+        .list_with_delimiter(Some(&Path::from("a")))
+        .await
+        .unwrap();
+    assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
+    assert!(files.objects.is_empty());
+
+    let files = storage
+        .list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
+        .await
+        .unwrap();
+    assert!(files.common_prefixes.is_empty());
+    assert_eq!(files.objects.len(), 1);
+    assert_eq!(files.objects[0].location, file_with_delimiter);
+
+    storage.delete(&file_with_delimiter).await.unwrap();
+
+    // Test handling of paths containing non-ASCII characters, e.g. emoji
+
+    let emoji_prefix = Path::from("🙀");
+    let emoji_file = Path::from("🙀/😀.parquet");
+    storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
+
+    storage.head(&emoji_file).await.unwrap();
+    storage
+        .get(&emoji_file)
+        .await
+        .unwrap()
+        .bytes()
+        .await
+        .unwrap();
+
+    let files = flatten_list_stream(storage, Some(&emoji_prefix))
+        .await
+        .unwrap();
+
+    assert_eq!(files, vec![emoji_file.clone()]);
+
+    let dst = Path::from("foo.parquet");
+    storage.copy(&emoji_file, &dst).await.unwrap();
+    let mut files = flatten_list_stream(storage, None).await.unwrap();
+    files.sort_unstable();
+    assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
+
+    let dst2 = Path::from("new/nested/foo.parquet");
+    storage.copy(&emoji_file, &dst2).await.unwrap();
+    let mut files = flatten_list_stream(storage, None).await.unwrap();
+    files.sort_unstable();
+    assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);
+
+    let dst3 = Path::from("new/nested2/bar.parquet");
+    storage.rename(&dst, &dst3).await.unwrap();
+    let mut files = flatten_list_stream(storage, None).await.unwrap();
+    files.sort_unstable();
+    assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);
+
+    let err = storage.head(&dst).await.unwrap_err();
+    assert!(matches!(err, Error::NotFound { .. }));
+
+    storage.delete(&emoji_file).await.unwrap();
+    storage.delete(&dst3).await.unwrap();
+    storage.delete(&dst2).await.unwrap();
+    let files = flatten_list_stream(storage, Some(&emoji_prefix))
+        .await
+        .unwrap();
+    assert!(files.is_empty());
+
+    // Test handling of paths containing percent-encoded sequences
+
+    // "HELLO" percent encoded
+    let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
+    let path = hello_prefix.child("foo.parquet");
+
+    storage.put(&path, vec![0, 1].into()).await.unwrap();
+    let files = flatten_list_stream(storage, Some(&hello_prefix))
+        .await
+        .unwrap();
+    assert_eq!(files, vec![path.clone()]);
+
+    // Cannot list by decoded representation
+    let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
+        .await
+        .unwrap();
+    assert!(files.is_empty());
+
+    // Cannot access by decoded representation
+    let err = storage
+        .head(&Path::from("HELLO/foo.parquet"))
+        .await
+        .unwrap_err();
+    assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+    storage.delete(&path).await.unwrap();
+
+    // Test handling of unicode paths
+    let path = Path::parse("🇦🇺/$shenanigans@@~.txt").unwrap();
+    storage.put(&path, "test".into()).await.unwrap();
+
+    let r = storage.get(&path).await.unwrap();
+    assert_eq!(r.bytes().await.unwrap(), "test");
+
+    let dir = Path::parse("🇦🇺").unwrap();
+    let r = storage.list_with_delimiter(None).await.unwrap();
+    assert!(r.common_prefixes.contains(&dir));
+
+    let r = storage.list_with_delimiter(Some(&dir)).await.unwrap();
+    assert_eq!(r.objects.len(), 1);
+    assert_eq!(r.objects[0].location, path);
+
+    storage.delete(&path).await.unwrap();
+
+    // Can also write non-percent encoded sequences
+    let path = Path::parse("%Q.parquet").unwrap();
+    storage.put(&path, vec![0, 1].into()).await.unwrap();
+
+    let files = flatten_list_stream(storage, None).await.unwrap();
+    assert_eq!(files, vec![path.clone()]);
+
+    storage.delete(&path).await.unwrap();
+
+    let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
+    storage.put(&path, vec![0, 1].into()).await.unwrap();
+    storage.head(&path).await.unwrap();
+
+    let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
+        .await
+        .unwrap();
+    assert_eq!(files, vec![path.clone()]);
+
+    storage.delete(&path).await.unwrap();
+
+    let files = flatten_list_stream(storage, None).await.unwrap();
+    assert!(files.is_empty(), "{files:?}");
+
+    // Test list order
+    let files = vec![
+        Path::from("a a/b.file"),
+        Path::parse("a%2Fa.file").unwrap(),
+        Path::from("a/😀.file"),
+        Path::from("a/a file"),
+        Path::parse("a/a%2F.file").unwrap(),
+        Path::from("a/a.file"),
+        Path::from("a/a/b.file"),
+        Path::from("a/b.file"),
+        Path::from("aa/a.file"),
+        Path::from("ab/a.file"),
+    ];
+
+    for file in &files {
+        storage.put(file, "foo".into()).await.unwrap();
+    }
+
+    let cases = [
+        (None, Path::from("a")),
+        (None, Path::from("a/a file")),
+        (None, Path::from("a/a/b.file")),
+        (None, Path::from("ab/a.file")),
+        (None, Path::from("a%2Fa.file")),
+        (None, Path::from("a/😀.file")),
+        (Some(Path::from("a")), Path::from("")),
+        (Some(Path::from("a")), Path::from("a")),
+        (Some(Path::from("a")), Path::from("a/😀")),
+        (Some(Path::from("a")), Path::from("a/😀.file")),
+        (Some(Path::from("a")), Path::from("a/b")),
+        (Some(Path::from("a")), Path::from("a/a/b.file")),
+    ];
+
+    for (prefix, offset) in cases {
+        let s = storage.list_with_offset(prefix.as_ref(), &offset);
+        let mut actual: Vec<_> = s.map_ok(|x| 
x.location).try_collect().await.unwrap();
+
+        actual.sort_unstable();
+
+        let expected: Vec<_> = files
+            .iter()
+            .filter(|x| {
+                let prefix_match = prefix.as_ref().map(|p| 
x.prefix_matches(p)).unwrap_or(true);
+                prefix_match && *x > &offset
+            })
+            .cloned()
+            .collect();
+
+        assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
+    }
+
+    // Test bulk delete
+    let paths = vec![
+        Path::from("a/a.file"),
+        Path::from("a/a/b.file"),
+        Path::from("aa/a.file"),
+        Path::from("does_not_exist"),
+        Path::from("I'm a < & weird path"),
+        Path::from("ab/a.file"),
+        Path::from("a/😀.file"),
+    ];
+
+    storage.put(&paths[4], "foo".into()).await.unwrap();
+
+    let out_paths = storage
+        .delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
+        .collect::<Vec<_>>()
+        .await;
+
+    assert_eq!(out_paths.len(), paths.len());
+
+    let expect_errors = [3];
+
+    for (i, input_path) in paths.iter().enumerate() {
+        let err = storage.head(input_path).await.unwrap_err();
+        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+        if expect_errors.contains(&i) {
+            // Some object stores will report NotFound, but others (such as 
S3) will
+            // report success regardless.
+            match &out_paths[i] {
+                Err(Error::NotFound { path: out_path, .. }) => {
+                    assert!(out_path.ends_with(&input_path.to_string()));
+                }
+                Ok(out_path) => {
+                    assert_eq!(out_path, input_path);
+                }
+                _ => panic!("unexpected error"),
+            }
+        } else {
+            assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
+        }
+    }
+
+    delete_fixtures(storage).await;
+
+    let path = Path::from("empty");
+    storage.put(&path, PutPayload::default()).await.unwrap();
+    let meta = storage.head(&path).await.unwrap();
+    assert_eq!(meta.size, 0);
+    let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
+    assert_eq!(data.len(), 0);
+
+    storage.delete(&path).await.unwrap();
+}
+
+/// Tests the ability to read and write [`Attributes`]
+pub async fn put_get_attributes(integration: &dyn ObjectStore) {
+    // Test handling of attributes
+    let attributes = Attributes::from_iter([
+        (Attribute::CacheControl, "max-age=604800"),
+        (
+            Attribute::ContentDisposition,
+            r#"attachment; filename="test.html""#,
+        ),
+        (Attribute::ContentEncoding, "gzip"),
+        (Attribute::ContentLanguage, "en-US"),
+        (Attribute::ContentType, "text/html; charset=utf-8"),
+    ]);
+
+    let path = Path::from("attributes");
+    let opts = attributes.clone().into();
+    match integration.put_opts(&path, "foo".into(), opts).await {
+        Ok(_) => {
+            let r = integration.get(&path).await.unwrap();
+            assert_eq!(r.attributes, attributes);
+        }
+        Err(Error::NotImplemented) => {}
+        Err(e) => panic!("{e}"),
+    }
+
+    let opts = attributes.clone().into();
+    match integration.put_multipart_opts(&path, opts).await {
+        Ok(mut w) => {
+            w.put_part("foo".into()).await.unwrap();
+            w.complete().await.unwrap();
+
+            let r = integration.get(&path).await.unwrap();
+            assert_eq!(r.attributes, attributes);
+        }
+        Err(Error::NotImplemented) => {}
+        Err(e) => panic!("{e}"),
+    }
+}
+
+/// Tests conditional read requests
+pub async fn get_opts(storage: &dyn ObjectStore) {
+    let path = Path::from("test");
+    storage.put(&path, "foo".into()).await.unwrap();
+    let meta = storage.head(&path).await.unwrap();
+
+    let options = GetOptions {
+        if_unmodified_since: Some(meta.last_modified),
+        ..GetOptions::default()
+    };
+    match storage.get_opts(&path, options).await {
+        Ok(_) | Err(Error::NotSupported { .. }) => {}
+        Err(e) => panic!("{e}"),
+    }
+
+    let options = GetOptions {
+        if_unmodified_since: Some(meta.last_modified + 
chrono::Duration::try_hours(10).unwrap()),
+        ..GetOptions::default()
+    };
+    match storage.get_opts(&path, options).await {
+        Ok(_) | Err(Error::NotSupported { .. }) => {}
+        Err(e) => panic!("{e}"),
+    }
+
+    let options = GetOptions {
+        if_unmodified_since: Some(meta.last_modified - 
chrono::Duration::try_hours(10).unwrap()),
+        ..GetOptions::default()
+    };
+    match storage.get_opts(&path, options).await {
+        Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
+        d => panic!("{d:?}"),
+    }
+
+    let options = GetOptions {
+        if_modified_since: Some(meta.last_modified),
+        ..GetOptions::default()
+    };
+    match storage.get_opts(&path, options).await {
+        Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
+        d => panic!("{d:?}"),
+    }
+
+    let options = GetOptions {
+        if_modified_since: Some(meta.last_modified - 
chrono::Duration::try_hours(10).unwrap()),
+        ..GetOptions::default()
+    };
+    match storage.get_opts(&path, options).await {
+        Ok(_) | Err(Error::NotSupported { .. }) => {}
+        Err(e) => panic!("{e}"),
+    }
+
+    let tag = meta.e_tag.unwrap();
+    let options = GetOptions {
+        if_match: Some(tag.clone()),
+        ..GetOptions::default()
+    };
+    storage.get_opts(&path, options).await.unwrap();
+
+    let options = GetOptions {
+        if_match: Some("invalid".to_string()),
+        ..GetOptions::default()
+    };
+    let err = storage.get_opts(&path, options).await.unwrap_err();
+    assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+    let options = GetOptions {
+        if_none_match: Some(tag.clone()),
+        ..GetOptions::default()
+    };
+    let err = storage.get_opts(&path, options).await.unwrap_err();
+    assert!(matches!(err, Error::NotModified { .. }), "{err}");
+
+    let options = GetOptions {
+        if_none_match: Some("invalid".to_string()),
+        ..GetOptions::default()
+    };
+    storage.get_opts(&path, options).await.unwrap();
+
+    let result = storage.put(&path, "test".into()).await.unwrap();
+    let new_tag = result.e_tag.unwrap();
+    assert_ne!(tag, new_tag);
+
+    let meta = storage.head(&path).await.unwrap();
+    assert_eq!(meta.e_tag.unwrap(), new_tag);
+
+    let options = GetOptions {
+        if_match: Some(new_tag),
+        ..GetOptions::default()
+    };
+    storage.get_opts(&path, options).await.unwrap();
+
+    let options = GetOptions {
+        if_match: Some(tag),
+        ..GetOptions::default()
+    };
+    let err = storage.get_opts(&path, options).await.unwrap_err();
+    assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+    if let Some(version) = meta.version {
+        storage.put(&path, "bar".into()).await.unwrap();
+
+        let options = GetOptions {
+            version: Some(version),
+            ..GetOptions::default()
+        };
+
+        // Can retrieve previous version
+        let get_opts = storage.get_opts(&path, options).await.unwrap();
+        let old = get_opts.bytes().await.unwrap();
+        assert_eq!(old, b"test".as_slice());
+
+        // Current version contains the updated data
+        let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(&current, b"bar".as_slice());
+    }
+}
+
+/// Tests conditional writes
+pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
+    // When using DynamoCommit repeated runs of this test will produce the 
same sequence of records in DynamoDB
+    // As a result each conditional operation will need to wait for the lease 
to timeout before proceeding
+    // One solution would be to clear DynamoDB before each test, but this 
would require non-trivial additional code
+    // so we instead just generate a random suffix for the filenames
+    let rng = thread_rng();
+    let suffix = 
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+
+    delete_fixtures(storage).await;
+    let path = Path::from(format!("put_opts_{suffix}"));
+    let v1 = storage
+        .put_opts(&path, "a".into(), PutMode::Create.into())
+        .await
+        .unwrap();
+
+    let err = storage
+        .put_opts(&path, "b".into(), PutMode::Create.into())
+        .await
+        .unwrap_err();
+    assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
+
+    let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+    assert_eq!(b.as_ref(), b"a");
+
+    if !supports_update {
+        return;
+    }
+
+    let v2 = storage
+        .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into())
+        .await
+        .unwrap();
+
+    let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+    assert_eq!(b.as_ref(), b"c");
+
+    let err = storage
+        .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
+        .await
+        .unwrap_err();
+    assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+    storage
+        .put_opts(&path, "e".into(), PutMode::Update(v2.clone().into()).into())
+        .await
+        .unwrap();
+
+    let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+    assert_eq!(b.as_ref(), b"e");
+
+    // Update not exists
+    let path = Path::from("I don't exist");
+    let err = storage
+        .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
+        .await
+        .unwrap_err();
+    assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+    const NUM_WORKERS: usize = 5;
+    const NUM_INCREMENTS: usize = 10;
+
+    let path = Path::from(format!("RACE-{suffix}"));
+    let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
+        .map(|_| async {
+            for _ in 0..NUM_INCREMENTS {
+                loop {
+                    match storage.get(&path).await {
+                        Ok(r) => {
+                            let mode = PutMode::Update(UpdateVersion {
+                                e_tag: r.meta.e_tag.clone(),
+                                version: r.meta.version.clone(),
+                            });
+
+                            let b = r.bytes().await.unwrap();
+                            let v: usize = 
std::str::from_utf8(&b).unwrap().parse().unwrap();
+                            let new = (v + 1).to_string();
+
+                            match storage.put_opts(&path, new.into(), 
mode.into()).await {
+                                Ok(_) => break,
+                                Err(Error::Precondition { .. }) => continue,
+                                Err(e) => return Err(e),
+                            }
+                        }
+                        Err(Error::NotFound { .. }) => {
+                            let mode = PutMode::Create;
+                            match storage.put_opts(&path, "1".into(), 
mode.into()).await {
+                                Ok(_) => break,
+                                Err(Error::AlreadyExists { .. }) => continue,
+                                Err(e) => return Err(e),
+                            }
+                        }
+                        Err(e) => return Err(e),
+                    }
+                }
+            }
+            Ok(())
+        })
+        .collect();
+
+    while futures.next().await.transpose().unwrap().is_some() {}
+    let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+    let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
+    assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);
+}
+
+/// Returns a chunk of length `chunk_length`
+fn get_chunk(chunk_length: usize) -> Bytes {
+    let mut data = vec![0_u8; chunk_length];
+    let mut rng = thread_rng();
+    // Set a random selection of bytes
+    for _ in 0..1000 {
+        data[rng.gen_range(0..chunk_length)] = rng.gen();
+    }
+    data.into()
+}
+
+/// Returns `num_chunks` of length `chunks`
+fn get_chunks(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
+    (0..num_chunks).map(|_| get_chunk(chunk_length)).collect()
+}
+
+/// Tests the ability to perform multipart writes
+pub async fn stream_get(storage: &DynObjectStore) {
+    let location = Path::from("test_dir/test_upload_file.txt");
+
+    // Can write to storage
+    let data = get_chunks(5 * 1024 * 1024, 3);
+    let bytes_expected = data.concat();
+    let mut upload = storage.put_multipart(&location).await.unwrap();
+    let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
+    futures::future::try_join_all(uploads).await.unwrap();
+
+    // Object should not yet exist in store
+    let meta_res = storage.head(&location).await;
+    assert!(meta_res.is_err());
+    assert!(matches!(
+        meta_res.unwrap_err(),
+        crate::Error::NotFound { .. }
+    ));
+
+    let files = flatten_list_stream(storage, None).await.unwrap();
+    assert_eq!(&files, &[]);
+
+    let result = storage.list_with_delimiter(None).await.unwrap();
+    assert_eq!(&result.objects, &[]);
+
+    upload.complete().await.unwrap();
+
+    let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
+    assert_eq!(bytes_expected, bytes_written);
+
+    // Can overwrite some storage
+    // Sizes chosen to ensure we write three parts
+    let data = get_chunks(3_200_000, 7);
+    let bytes_expected = data.concat();
+    let upload = storage.put_multipart(&location).await.unwrap();
+    let mut writer = WriteMultipart::new(upload);
+    for chunk in &data {
+        writer.write(chunk)
+    }
+    writer.finish().await.unwrap();
+    let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
+    assert_eq!(bytes_expected, bytes_written);
+
+    // We can abort an empty write
+    let location = Path::from("test_dir/test_abort_upload.txt");
+    let mut upload = storage.put_multipart(&location).await.unwrap();
+    upload.abort().await.unwrap();
+    let get_res = storage.get(&location).await;
+    assert!(get_res.is_err());
+    assert!(matches!(
+        get_res.unwrap_err(),
+        crate::Error::NotFound { .. }
+    ));
+
+    // We can abort an in-progress write
+    let mut upload = storage.put_multipart(&location).await.unwrap();
+    upload
+        .put_part(data.first().unwrap().clone().into())
+        .await
+        .unwrap();
+
+    upload.abort().await.unwrap();
+    let get_res = storage.get(&location).await;
+    assert!(get_res.is_err());
+    assert!(matches!(get_res.unwrap_err(), Error::NotFound { .. }));
+}
+
+/// Tests that directories are transparent
+pub async fn list_uses_directories_correctly(storage: &DynObjectStore) {
+    delete_fixtures(storage).await;
+
+    let content_list = flatten_list_stream(storage, None).await.unwrap();
+    assert!(
+        content_list.is_empty(),
+        "Expected list to be empty; found: {content_list:?}"
+    );
+
+    let location1 = Path::from("foo/x.json");
+    let location2 = Path::from("foo.bar/y.json");
+
+    let data = PutPayload::from("arbitrary data");
+    storage.put(&location1, data.clone()).await.unwrap();
+    storage.put(&location2, data).await.unwrap();
+
+    let prefix = Path::from("foo");
+    let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
+    assert_eq!(content_list, &[location1.clone()]);
+
+    let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+    assert_eq!(result.objects.len(), 1);
+    assert_eq!(result.objects[0].location, location1);
+    assert_eq!(result.common_prefixes, &[]);
+
+    // Listing an existing path (file) should return an empty list:
+    // https://github.com/apache/arrow-rs/issues/3712
+    let content_list = flatten_list_stream(storage, Some(&location1))
+        .await
+        .unwrap();
+    assert_eq!(content_list, &[]);
+
+    let list = storage.list_with_delimiter(Some(&location1)).await.unwrap();
+    assert_eq!(list.objects, &[]);
+    assert_eq!(list.common_prefixes, &[]);
+
+    let prefix = Path::from("foo/x");
+    let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
+    assert_eq!(content_list, &[]);
+
+    let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+    assert_eq!(list.objects, &[]);
+    assert_eq!(list.common_prefixes, &[]);
+}
+
+/// Tests listing with delimiter
+pub async fn list_with_delimiter(storage: &DynObjectStore) {
+    delete_fixtures(storage).await;
+
+    // ==================== check: store is empty ====================
+    let content_list = flatten_list_stream(storage, None).await.unwrap();
+    assert!(content_list.is_empty());
+
+    // ==================== do: create files ====================
+    let data = Bytes::from("arbitrary data");
+
+    let files: Vec<_> = [
+        "test_file",
+        "mydb/wb/000/000/000.segment",
+        "mydb/wb/000/000/001.segment",
+        "mydb/wb/000/000/002.segment",
+        "mydb/wb/001/001/000.segment",
+        "mydb/wb/foo.json",
+        "mydb/wbwbwb/111/222/333.segment",
+        "mydb/data/whatevs",
+    ]
+    .iter()
+    .map(|&s| Path::from(s))
+    .collect();
+
+    for f in &files {
+        storage.put(f, data.clone().into()).await.unwrap();
+    }
+
+    // ==================== check: prefix-list `mydb/wb` (directory) 
====================
+    let prefix = Path::from("mydb/wb");
+
+    let expected_000 = Path::from("mydb/wb/000");
+    let expected_001 = Path::from("mydb/wb/001");
+    let expected_location = Path::from("mydb/wb/foo.json");
+
+    let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+
+    assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
+    assert_eq!(result.objects.len(), 1);
+
+    let object = &result.objects[0];
+
+    assert_eq!(object.location, expected_location);
+    assert_eq!(object.size, data.len());
+
+    // ==================== check: prefix-list `mydb/wb/000/000/001` (partial 
filename doesn't match) ====================
+    let prefix = Path::from("mydb/wb/000/000/001");
+
+    let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+    assert!(result.common_prefixes.is_empty());
+    assert_eq!(result.objects.len(), 0);
+
+    // ==================== check: prefix-list `not_there` (non-existing 
prefix) ====================
+    let prefix = Path::from("not_there");
+
+    let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+    assert!(result.common_prefixes.is_empty());
+    assert!(result.objects.is_empty());
+
+    // ==================== do: remove all files ====================
+    for f in &files {
+        storage.delete(f).await.unwrap();
+    }
+
+    // ==================== check: store is empty ====================
+    let content_list = flatten_list_stream(storage, None).await.unwrap();
+    assert!(content_list.is_empty());
+}
+
+/// Tests fetching a non-existent object returns a not found error
+pub async fn get_nonexistent_object(
+    storage: &DynObjectStore,
+    location: Option<Path>,
+) -> crate::Result<Bytes> {
+    let location = location.unwrap_or_else(|| 
Path::from("this_file_should_not_exist"));
+
+    let err = storage.head(&location).await.unwrap_err();
+    assert!(matches!(err, Error::NotFound { .. }));
+
+    storage.get(&location).await?.bytes().await
+}
+
+/// Tests copying
+pub async fn rename_and_copy(storage: &DynObjectStore) {
+    // Create two objects
+    let path1 = Path::from("test1");
+    let path2 = Path::from("test2");
+    let contents1 = Bytes::from("cats");
+    let contents2 = Bytes::from("dogs");
+
+    // copy() make both objects identical
+    storage.put(&path1, contents1.clone().into()).await.unwrap();
+    storage.put(&path2, contents2.clone().into()).await.unwrap();
+    storage.copy(&path1, &path2).await.unwrap();
+    let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
+    assert_eq!(&new_contents, &contents1);
+
+    // rename() copies contents and deletes original
+    storage.put(&path1, contents1.clone().into()).await.unwrap();
+    storage.put(&path2, contents2.clone().into()).await.unwrap();
+    storage.rename(&path1, &path2).await.unwrap();
+    let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
+    assert_eq!(&new_contents, &contents1);
+    let result = storage.get(&path1).await;
+    assert!(result.is_err());
+    assert!(matches!(result.unwrap_err(), Error::NotFound { .. }));
+
+    // Clean up
+    storage.delete(&path2).await.unwrap();
+}
+
+/// Tests copy if not exists
+pub async fn copy_if_not_exists(storage: &DynObjectStore) {
+    // Create two objects
+    let path1 = Path::from("test1");
+    let path2 = Path::from("not_exists_nested/test2");
+    let contents1 = Bytes::from("cats");
+    let contents2 = Bytes::from("dogs");
+
+    // copy_if_not_exists() errors if destination already exists
+    storage.put(&path1, contents1.clone().into()).await.unwrap();
+    storage.put(&path2, contents2.clone().into()).await.unwrap();
+    let result = storage.copy_if_not_exists(&path1, &path2).await;
+    assert!(result.is_err());
+    assert!(matches!(
+        result.unwrap_err(),
+        crate::Error::AlreadyExists { .. }
+    ));
+
+    // copy_if_not_exists() copies contents and allows deleting original
+    storage.delete(&path2).await.unwrap();
+    storage.copy_if_not_exists(&path1, &path2).await.unwrap();
+    storage.delete(&path1).await.unwrap();
+    let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
+    assert_eq!(&new_contents, &contents1);
+    let result = storage.get(&path1).await;
+    assert!(result.is_err());
+    assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+    // Clean up
+    storage.delete(&path2).await.unwrap();
+}
+
+/// Tests copy and renaming behaviour of non-existent objects
+pub async fn copy_rename_nonexistent_object(storage: &DynObjectStore) {
+    // Create empty source object
+    let path1 = Path::from("test1");
+
+    // Create destination object
+    let path2 = Path::from("test2");
+    storage.put(&path2, "hello".into()).await.unwrap();
+
+    // copy() errors if source does not exist
+    let result = storage.copy(&path1, &path2).await;
+    assert!(result.is_err());
+    assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+    // rename() errors if source does not exist
+    let result = storage.rename(&path1, &path2).await;
+    assert!(result.is_err());
+    assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+    // copy_if_not_exists() errors if source does not exist
+    let result = storage.copy_if_not_exists(&path1, &path2).await;
+    assert!(result.is_err());
+    assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+    // Clean up
+    storage.delete(&path2).await.unwrap();
+}
+
+/// Tests [`MultipartStore`]
+pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn 
MultipartStore) {
+    let path = Path::from("test_multipart");
+    let chunk_size = 5 * 1024 * 1024;
+
+    let chunks = get_chunks(chunk_size, 2);
+
+    let id = multipart.create_multipart(&path).await.unwrap();
+
+    let parts: Vec<_> = futures::stream::iter(chunks)
+        .enumerate()
+        .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
+        .buffered(2)
+        .try_collect()
+        .await
+        .unwrap();
+
+    multipart
+        .complete_multipart(&path, &id, parts)
+        .await
+        .unwrap();
+
+    let meta = storage.head(&path).await.unwrap();
+    assert_eq!(meta.size, chunk_size * 2);
+
+    // Empty case
+    let path = Path::from("test_empty_multipart");
+
+    let id = multipart.create_multipart(&path).await.unwrap();
+
+    let parts = vec![];
+
+    multipart
+        .complete_multipart(&path, &id, parts)
+        .await
+        .unwrap();
+
+    let meta = storage.head(&path).await.unwrap();
+    assert_eq!(meta.size, 0);
+}
+
+async fn delete_fixtures(storage: &DynObjectStore) {
+    let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
+    storage
+        .delete_stream(paths)
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 9a8f77b4d82..bdf870f45ab 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -545,6 +545,9 @@ mod util;
 
 mod attributes;
 
+#[cfg(any(feature = "integration", test))]
+pub mod integration;
+
 pub use attributes::*;
 
 pub use parse::{parse_url, parse_url_opts};
@@ -1285,9 +1288,11 @@ impl From<Error> for std::io::Error {
 }
 
 #[cfg(test)]
-mod test_util {
+mod tests {
     use super::*;
-    use futures::TryStreamExt;
+    use crate::buffered::BufWriter;
+    use chrono::TimeZone;
+    use tokio::io::AsyncWriteExt;
 
     macro_rules! maybe_skip_integration {
         () => {
@@ -1299,1054 +1304,19 @@ mod test_util {
     }
     pub(crate) use maybe_skip_integration;
 
-    pub async fn flatten_list_stream(
-        storage: &DynObjectStore,
-        prefix: Option<&Path>,
-    ) -> Result<Vec<Path>> {
-        storage
-            .list(prefix)
-            .map_ok(|meta| meta.location)
-            .try_collect::<Vec<Path>>()
-            .await
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::buffered::BufWriter;
-    use crate::multipart::MultipartStore;
-    use crate::test_util::flatten_list_stream;
-    use chrono::TimeZone;
-    use futures::stream::FuturesUnordered;
-    use rand::distributions::Alphanumeric;
-    use rand::{thread_rng, Rng};
-    use tokio::io::AsyncWriteExt;
-
-    pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
-        delete_fixtures(storage).await;
-
-        let content_list = flatten_list_stream(storage, None).await.unwrap();
-        assert!(
-            content_list.is_empty(),
-            "Expected list to be empty; found: {content_list:?}"
-        );
-
-        let location = Path::from("test_dir/test_file.json");
-
-        let data = Bytes::from("arbitrary data");
-        storage.put(&location, data.clone().into()).await.unwrap();
-
-        let root = Path::from("/");
-
-        // List everything
-        let content_list = flatten_list_stream(storage, None).await.unwrap();
-        assert_eq!(content_list, &[location.clone()]);
-
-        // Should behave the same as no prefix
-        let content_list = flatten_list_stream(storage, 
Some(&root)).await.unwrap();
-        assert_eq!(content_list, &[location.clone()]);
-
-        // List with delimiter
-        let result = storage.list_with_delimiter(None).await.unwrap();
-        assert_eq!(&result.objects, &[]);
-        assert_eq!(result.common_prefixes.len(), 1);
-        assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
-
-        // Should behave the same as no prefix
-        let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
-        assert!(result.objects.is_empty());
-        assert_eq!(result.common_prefixes.len(), 1);
-        assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
-
-        // Should return not found
-        let err = storage.get(&Path::from("test_dir")).await.unwrap_err();
-        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
-        // Should return not found
-        let err = storage.head(&Path::from("test_dir")).await.unwrap_err();
-        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
-        // List everything starting with a prefix that should return results
-        let prefix = Path::from("test_dir");
-        let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
-        assert_eq!(content_list, &[location.clone()]);
-
-        // List everything starting with a prefix that shouldn't return results
-        let prefix = Path::from("something");
-        let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
-        assert!(content_list.is_empty());
-
-        let read_data = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
-        assert_eq!(&*read_data, data);
-
-        // Test range request
-        let range = 3..7;
-        let range_result = storage.get_range(&location, range.clone()).await;
-
-        let bytes = range_result.unwrap();
-        assert_eq!(bytes, data.slice(range.clone()));
-
-        let opts = GetOptions {
-            range: Some(GetRange::Bounded(2..5)),
-            ..Default::default()
-        };
-        let result = storage.get_opts(&location, opts).await.unwrap();
-        // Data is `"arbitrary data"`, length 14 bytes
-        assert_eq!(result.meta.size, 14); // Should return full object size 
(#5272)
-        assert_eq!(result.range, 2..5);
-        let bytes = result.bytes().await.unwrap();
-        assert_eq!(bytes, b"bit".as_ref());
-
-        let out_of_range = 200..300;
-        let out_of_range_result = storage.get_range(&location, 
out_of_range).await;
-
-        // Should be a non-fatal error
-        out_of_range_result.unwrap_err();
-
-        let opts = GetOptions {
-            range: Some(GetRange::Bounded(2..100)),
-            ..Default::default()
-        };
-        let result = storage.get_opts(&location, opts).await.unwrap();
-        assert_eq!(result.range, 2..14);
-        assert_eq!(result.meta.size, 14);
-        let bytes = result.bytes().await.unwrap();
-        assert_eq!(bytes, b"bitrary data".as_ref());
-
-        let opts = GetOptions {
-            range: Some(GetRange::Suffix(2)),
-            ..Default::default()
-        };
-        match storage.get_opts(&location, opts).await {
-            Ok(result) => {
-                assert_eq!(result.range, 12..14);
-                assert_eq!(result.meta.size, 14);
-                let bytes = result.bytes().await.unwrap();
-                assert_eq!(bytes, b"ta".as_ref());
-            }
-            Err(Error::NotSupported { .. }) => {}
-            Err(e) => panic!("{e}"),
-        }
-
-        let opts = GetOptions {
-            range: Some(GetRange::Suffix(100)),
-            ..Default::default()
-        };
-        match storage.get_opts(&location, opts).await {
-            Ok(result) => {
-                assert_eq!(result.range, 0..14);
-                assert_eq!(result.meta.size, 14);
-                let bytes = result.bytes().await.unwrap();
-                assert_eq!(bytes, b"arbitrary data".as_ref());
-            }
-            Err(Error::NotSupported { .. }) => {}
-            Err(e) => panic!("{e}"),
-        }
-
-        let opts = GetOptions {
-            range: Some(GetRange::Offset(3)),
-            ..Default::default()
-        };
-        let result = storage.get_opts(&location, opts).await.unwrap();
-        assert_eq!(result.range, 3..14);
-        assert_eq!(result.meta.size, 14);
-        let bytes = result.bytes().await.unwrap();
-        assert_eq!(bytes, b"itrary data".as_ref());
-
-        let opts = GetOptions {
-            range: Some(GetRange::Offset(100)),
-            ..Default::default()
-        };
-        storage.get_opts(&location, opts).await.unwrap_err();
-
-        let ranges = vec![0..1, 2..3, 0..5];
-        let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
-        for (range, bytes) in ranges.iter().zip(bytes) {
-            assert_eq!(bytes, data.slice(range.clone()))
-        }
-
-        let head = storage.head(&location).await.unwrap();
-        assert_eq!(head.size, data.len());
-
-        storage.delete(&location).await.unwrap();
-
-        let content_list = flatten_list_stream(storage, None).await.unwrap();
-        assert!(content_list.is_empty());
-
-        let err = storage.get(&location).await.unwrap_err();
-        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
-        let err = storage.head(&location).await.unwrap_err();
-        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
-        // Test handling of paths containing an encoded delimiter
-
-        let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
-        storage
-            .put(&file_with_delimiter, "arbitrary".into())
-            .await
-            .unwrap();
-
-        let files = flatten_list_stream(storage, None).await.unwrap();
-        assert_eq!(files, vec![file_with_delimiter.clone()]);
-
-        let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
-            .await
-            .unwrap();
-        assert!(files.is_empty());
-
-        let files = storage
-            .list_with_delimiter(Some(&Path::from("a/b")))
-            .await
-            .unwrap();
-        assert!(files.common_prefixes.is_empty());
-        assert!(files.objects.is_empty());
-
-        let files = storage
-            .list_with_delimiter(Some(&Path::from("a")))
-            .await
-            .unwrap();
-        assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
-        assert!(files.objects.is_empty());
-
-        let files = storage
-            .list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
-            .await
-            .unwrap();
-        assert!(files.common_prefixes.is_empty());
-        assert_eq!(files.objects.len(), 1);
-        assert_eq!(files.objects[0].location, file_with_delimiter);
-
-        storage.delete(&file_with_delimiter).await.unwrap();
-
-        // Test handling of paths containing non-ASCII characters, e.g. emoji
-
-        let emoji_prefix = Path::from("🙀");
-        let emoji_file = Path::from("🙀/😀.parquet");
-        storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
-
-        storage.head(&emoji_file).await.unwrap();
-        storage
-            .get(&emoji_file)
-            .await
-            .unwrap()
-            .bytes()
-            .await
-            .unwrap();
-
-        let files = flatten_list_stream(storage, Some(&emoji_prefix))
-            .await
-            .unwrap();
-
-        assert_eq!(files, vec![emoji_file.clone()]);
-
-        let dst = Path::from("foo.parquet");
-        storage.copy(&emoji_file, &dst).await.unwrap();
-        let mut files = flatten_list_stream(storage, None).await.unwrap();
-        files.sort_unstable();
-        assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
-
-        let dst2 = Path::from("new/nested/foo.parquet");
-        storage.copy(&emoji_file, &dst2).await.unwrap();
-        let mut files = flatten_list_stream(storage, None).await.unwrap();
-        files.sort_unstable();
-        assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);
-
-        let dst3 = Path::from("new/nested2/bar.parquet");
-        storage.rename(&dst, &dst3).await.unwrap();
-        let mut files = flatten_list_stream(storage, None).await.unwrap();
-        files.sort_unstable();
-        assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), 
dst3.clone()]);
-
-        let err = storage.head(&dst).await.unwrap_err();
-        assert!(matches!(err, Error::NotFound { .. }));
-
-        storage.delete(&emoji_file).await.unwrap();
-        storage.delete(&dst3).await.unwrap();
-        storage.delete(&dst2).await.unwrap();
-        let files = flatten_list_stream(storage, Some(&emoji_prefix))
-            .await
-            .unwrap();
-        assert!(files.is_empty());
-
-        // Test handling of paths containing percent-encoded sequences
-
-        // "HELLO" percent encoded
-        let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
-        let path = hello_prefix.child("foo.parquet");
-
-        storage.put(&path, vec![0, 1].into()).await.unwrap();
-        let files = flatten_list_stream(storage, Some(&hello_prefix))
-            .await
-            .unwrap();
-        assert_eq!(files, vec![path.clone()]);
-
-        // Cannot list by decoded representation
-        let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
-            .await
-            .unwrap();
-        assert!(files.is_empty());
-
-        // Cannot access by decoded representation
-        let err = storage
-            .head(&Path::from("HELLO/foo.parquet"))
-            .await
-            .unwrap_err();
-        assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
-        storage.delete(&path).await.unwrap();
-
-        // Test handling of unicode paths
-        let path = Path::parse("🇦🇺/$shenanigans@@~.txt").unwrap();
-        storage.put(&path, "test".into()).await.unwrap();
-
-        let r = storage.get(&path).await.unwrap();
-        assert_eq!(r.bytes().await.unwrap(), "test");
-
-        let dir = Path::parse("🇦🇺").unwrap();
-        let r = storage.list_with_delimiter(None).await.unwrap();
-        assert!(r.common_prefixes.contains(&dir));
-
-        let r = storage.list_with_delimiter(Some(&dir)).await.unwrap();
-        assert_eq!(r.objects.len(), 1);
-        assert_eq!(r.objects[0].location, path);
-
-        storage.delete(&path).await.unwrap();
-
-        // Can also write non-percent encoded sequences
-        let path = Path::parse("%Q.parquet").unwrap();
-        storage.put(&path, vec![0, 1].into()).await.unwrap();
-
-        let files = flatten_list_stream(storage, None).await.unwrap();
-        assert_eq!(files, vec![path.clone()]);
-
-        storage.delete(&path).await.unwrap();
-
-        let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
-        storage.put(&path, vec![0, 1].into()).await.unwrap();
-        storage.head(&path).await.unwrap();
-
-        let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
-            .await
-            .unwrap();
-        assert_eq!(files, vec![path.clone()]);
-
-        storage.delete(&path).await.unwrap();
-
-        let files = flatten_list_stream(storage, None).await.unwrap();
-        assert!(files.is_empty(), "{files:?}");
-
-        // Test list order
-        let files = vec![
-            Path::from("a a/b.file"),
-            Path::parse("a%2Fa.file").unwrap(),
-            Path::from("a/😀.file"),
-            Path::from("a/a file"),
-            Path::parse("a/a%2F.file").unwrap(),
-            Path::from("a/a.file"),
-            Path::from("a/a/b.file"),
-            Path::from("a/b.file"),
-            Path::from("aa/a.file"),
-            Path::from("ab/a.file"),
-        ];
-
-        for file in &files {
-            storage.put(file, "foo".into()).await.unwrap();
-        }
-
-        let cases = [
-            (None, Path::from("a")),
-            (None, Path::from("a/a file")),
-            (None, Path::from("a/a/b.file")),
-            (None, Path::from("ab/a.file")),
-            (None, Path::from("a%2Fa.file")),
-            (None, Path::from("a/😀.file")),
-            (Some(Path::from("a")), Path::from("")),
-            (Some(Path::from("a")), Path::from("a")),
-            (Some(Path::from("a")), Path::from("a/😀")),
-            (Some(Path::from("a")), Path::from("a/😀.file")),
-            (Some(Path::from("a")), Path::from("a/b")),
-            (Some(Path::from("a")), Path::from("a/a/b.file")),
-        ];
-
-        for (prefix, offset) in cases {
-            let s = storage.list_with_offset(prefix.as_ref(), &offset);
-            let mut actual: Vec<_> = s.map_ok(|x| 
x.location).try_collect().await.unwrap();
-
-            actual.sort_unstable();
-
-            let expected: Vec<_> = files
-                .iter()
-                .filter(|x| {
-                    let prefix_match = prefix.as_ref().map(|p| 
x.prefix_matches(p)).unwrap_or(true);
-                    prefix_match && *x > &offset
-                })
-                .cloned()
-                .collect();
-
-            assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
-        }
-
-        // Test bulk delete
-        let paths = vec![
-            Path::from("a/a.file"),
-            Path::from("a/a/b.file"),
-            Path::from("aa/a.file"),
-            Path::from("does_not_exist"),
-            Path::from("I'm a < & weird path"),
-            Path::from("ab/a.file"),
-            Path::from("a/😀.file"),
-        ];
-
-        storage.put(&paths[4], "foo".into()).await.unwrap();
-
-        let out_paths = storage
-            
.delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
-            .collect::<Vec<_>>()
-            .await;
-
-        assert_eq!(out_paths.len(), paths.len());
-
-        let expect_errors = [3];
-
-        for (i, input_path) in paths.iter().enumerate() {
-            let err = storage.head(input_path).await.unwrap_err();
-            assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
-            if expect_errors.contains(&i) {
-                // Some object stores will report NotFound, but others (such 
as S3) will
-                // report success regardless.
-                match &out_paths[i] {
-                    Err(Error::NotFound { path: out_path, .. }) => {
-                        assert!(out_path.ends_with(&input_path.to_string()));
-                    }
-                    Ok(out_path) => {
-                        assert_eq!(out_path, input_path);
-                    }
-                    _ => panic!("unexpected error"),
-                }
-            } else {
-                assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
-            }
-        }
-
-        delete_fixtures(storage).await;
-
-        let path = Path::from("empty");
-        storage.put(&path, PutPayload::default()).await.unwrap();
-        let meta = storage.head(&path).await.unwrap();
-        assert_eq!(meta.size, 0);
-        let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
-        assert_eq!(data.len(), 0);
-
-        storage.delete(&path).await.unwrap();
-    }
-
-    pub(crate) async fn put_get_attributes(integration: &dyn ObjectStore) {
-        // Test handling of attributes
-        let attributes = Attributes::from_iter([
-            (Attribute::CacheControl, "max-age=604800"),
-            (
-                Attribute::ContentDisposition,
-                r#"attachment; filename="test.html""#,
-            ),
-            (Attribute::ContentEncoding, "gzip"),
-            (Attribute::ContentLanguage, "en-US"),
-            (Attribute::ContentType, "text/html; charset=utf-8"),
-        ]);
-
-        let path = Path::from("attributes");
-        let opts = attributes.clone().into();
-        match integration.put_opts(&path, "foo".into(), opts).await {
-            Ok(_) => {
-                let r = integration.get(&path).await.unwrap();
-                assert_eq!(r.attributes, attributes);
-            }
-            Err(Error::NotImplemented) => {}
-            Err(e) => panic!("{e}"),
-        }
-
-        let opts = attributes.clone().into();
-        match integration.put_multipart_opts(&path, opts).await {
-            Ok(mut w) => {
-                w.put_part("foo".into()).await.unwrap();
-                w.complete().await.unwrap();
-
-                let r = integration.get(&path).await.unwrap();
-                assert_eq!(r.attributes, attributes);
-            }
-            Err(Error::NotImplemented) => {}
-            Err(e) => panic!("{e}"),
-        }
-    }
-
-    pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
-        let path = Path::from("test");
-        storage.put(&path, "foo".into()).await.unwrap();
-        let meta = storage.head(&path).await.unwrap();
-
-        let options = GetOptions {
-            if_unmodified_since: Some(meta.last_modified),
-            ..GetOptions::default()
-        };
-        match storage.get_opts(&path, options).await {
-            Ok(_) | Err(Error::NotSupported { .. }) => {}
-            Err(e) => panic!("{e}"),
-        }
-
-        let options = GetOptions {
-            if_unmodified_since: Some(
-                meta.last_modified + chrono::Duration::try_hours(10).unwrap(),
-            ),
-            ..GetOptions::default()
-        };
-        match storage.get_opts(&path, options).await {
-            Ok(_) | Err(Error::NotSupported { .. }) => {}
-            Err(e) => panic!("{e}"),
-        }
-
-        let options = GetOptions {
-            if_unmodified_since: Some(
-                meta.last_modified - chrono::Duration::try_hours(10).unwrap(),
-            ),
-            ..GetOptions::default()
-        };
-        match storage.get_opts(&path, options).await {
-            Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
-            d => panic!("{d:?}"),
-        }
-
-        let options = GetOptions {
-            if_modified_since: Some(meta.last_modified),
-            ..GetOptions::default()
-        };
-        match storage.get_opts(&path, options).await {
-            Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
-            d => panic!("{d:?}"),
-        }
-
-        let options = GetOptions {
-            if_modified_since: Some(meta.last_modified - 
chrono::Duration::try_hours(10).unwrap()),
-            ..GetOptions::default()
-        };
-        match storage.get_opts(&path, options).await {
-            Ok(_) | Err(Error::NotSupported { .. }) => {}
-            Err(e) => panic!("{e}"),
-        }
-
-        let tag = meta.e_tag.unwrap();
-        let options = GetOptions {
-            if_match: Some(tag.clone()),
-            ..GetOptions::default()
-        };
-        storage.get_opts(&path, options).await.unwrap();
-
-        let options = GetOptions {
-            if_match: Some("invalid".to_string()),
-            ..GetOptions::default()
-        };
-        let err = storage.get_opts(&path, options).await.unwrap_err();
-        assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
-        let options = GetOptions {
-            if_none_match: Some(tag.clone()),
-            ..GetOptions::default()
-        };
-        let err = storage.get_opts(&path, options).await.unwrap_err();
-        assert!(matches!(err, Error::NotModified { .. }), "{err}");
-
-        let options = GetOptions {
-            if_none_match: Some("invalid".to_string()),
-            ..GetOptions::default()
-        };
-        storage.get_opts(&path, options).await.unwrap();
-
-        let result = storage.put(&path, "test".into()).await.unwrap();
-        let new_tag = result.e_tag.unwrap();
-        assert_ne!(tag, new_tag);
-
-        let meta = storage.head(&path).await.unwrap();
-        assert_eq!(meta.e_tag.unwrap(), new_tag);
-
-        let options = GetOptions {
-            if_match: Some(new_tag),
-            ..GetOptions::default()
-        };
-        storage.get_opts(&path, options).await.unwrap();
-
-        let options = GetOptions {
-            if_match: Some(tag),
-            ..GetOptions::default()
-        };
-        let err = storage.get_opts(&path, options).await.unwrap_err();
-        assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
-        if let Some(version) = meta.version {
-            storage.put(&path, "bar".into()).await.unwrap();
-
-            let options = GetOptions {
-                version: Some(version),
-                ..GetOptions::default()
-            };
-
-            // Can retrieve previous version
-            let get_opts = storage.get_opts(&path, options).await.unwrap();
-            let old = get_opts.bytes().await.unwrap();
-            assert_eq!(old, b"test".as_slice());
-
-            // Current version contains the updated data
-            let current = 
storage.get(&path).await.unwrap().bytes().await.unwrap();
-            assert_eq!(&current, b"bar".as_slice());
-        }
-    }
-
-    pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: 
bool) {
-        // When using DynamoCommit repeated runs of this test will produce the 
same sequence of records in DynamoDB
-        // As a result each conditional operation will need to wait for the 
lease to timeout before proceeding
-        // One solution would be to clear DynamoDB before each test, but this 
would require non-trivial additional code
-        // so we instead just generate a random suffix for the filenames
-        let rng = thread_rng();
-        let suffix = 
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
-
-        delete_fixtures(storage).await;
-        let path = Path::from(format!("put_opts_{suffix}"));
-        let v1 = storage
-            .put_opts(&path, "a".into(), PutMode::Create.into())
-            .await
-            .unwrap();
-
-        let err = storage
-            .put_opts(&path, "b".into(), PutMode::Create.into())
-            .await
-            .unwrap_err();
-        assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
-
-        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
-        assert_eq!(b.as_ref(), b"a");
-
-        if !supports_update {
-            return;
-        }
-
-        let v2 = storage
-            .put_opts(&path, "c".into(), 
PutMode::Update(v1.clone().into()).into())
-            .await
-            .unwrap();
-
-        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
-        assert_eq!(b.as_ref(), b"c");
-
-        let err = storage
-            .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
-            .await
-            .unwrap_err();
-        assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
-        storage
-            .put_opts(&path, "e".into(), 
PutMode::Update(v2.clone().into()).into())
-            .await
-            .unwrap();
-
-        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
-        assert_eq!(b.as_ref(), b"e");
-
-        // Update not exists
-        let path = Path::from("I don't exist");
-        let err = storage
-            .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
-            .await
-            .unwrap_err();
-        assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
-        const NUM_WORKERS: usize = 5;
-        const NUM_INCREMENTS: usize = 10;
-
-        let path = Path::from(format!("RACE-{suffix}"));
-        let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
-            .map(|_| async {
-                for _ in 0..NUM_INCREMENTS {
-                    loop {
-                        match storage.get(&path).await {
-                            Ok(r) => {
-                                let mode = PutMode::Update(UpdateVersion {
-                                    e_tag: r.meta.e_tag.clone(),
-                                    version: r.meta.version.clone(),
-                                });
-
-                                let b = r.bytes().await.unwrap();
-                                let v: usize = 
std::str::from_utf8(&b).unwrap().parse().unwrap();
-                                let new = (v + 1).to_string();
-
-                                match storage.put_opts(&path, new.into(), 
mode.into()).await {
-                                    Ok(_) => break,
-                                    Err(Error::Precondition { .. }) => 
continue,
-                                    Err(e) => return Err(e),
-                                }
-                            }
-                            Err(Error::NotFound { .. }) => {
-                                let mode = PutMode::Create;
-                                match storage.put_opts(&path, "1".into(), 
mode.into()).await {
-                                    Ok(_) => break,
-                                    Err(Error::AlreadyExists { .. }) => 
continue,
-                                    Err(e) => return Err(e),
-                                }
-                            }
-                            Err(e) => return Err(e),
-                        }
-                    }
-                }
-                Ok(())
-            })
-            .collect();
-
-        while futures.next().await.transpose().unwrap().is_some() {}
-        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
-        let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
-        assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);
-    }
-
-    /// Returns a chunk of length `chunk_length`
-    fn get_chunk(chunk_length: usize) -> Bytes {
-        let mut data = vec![0_u8; chunk_length];
-        let mut rng = thread_rng();
-        // Set a random selection of bytes
-        for _ in 0..1000 {
-            data[rng.gen_range(0..chunk_length)] = rng.gen();
-        }
-        data.into()
-    }
-
-    /// Returns `num_chunks` of length `chunks`
-    fn get_chunks(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
-        (0..num_chunks).map(|_| get_chunk(chunk_length)).collect()
-    }
-
-    pub(crate) async fn stream_get(storage: &DynObjectStore) {
-        let location = Path::from("test_dir/test_upload_file.txt");
-
-        // Can write to storage
-        let data = get_chunks(5 * 1024 * 1024, 3);
-        let bytes_expected = data.concat();
-        let mut upload = storage.put_multipart(&location).await.unwrap();
-        let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
-        futures::future::try_join_all(uploads).await.unwrap();
-
-        // Object should not yet exist in store
-        let meta_res = storage.head(&location).await;
-        assert!(meta_res.is_err());
-        assert!(matches!(
-            meta_res.unwrap_err(),
-            crate::Error::NotFound { .. }
-        ));
-
-        let files = flatten_list_stream(storage, None).await.unwrap();
-        assert_eq!(&files, &[]);
-
-        let result = storage.list_with_delimiter(None).await.unwrap();
-        assert_eq!(&result.objects, &[]);
-
-        upload.complete().await.unwrap();
-
-        let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
-        assert_eq!(bytes_expected, bytes_written);
-
-        // Can overwrite some storage
-        // Sizes chosen to ensure we write three parts
-        let data = get_chunks(3_200_000, 7);
-        let bytes_expected = data.concat();
-        let upload = storage.put_multipart(&location).await.unwrap();
-        let mut writer = WriteMultipart::new(upload);
-        for chunk in &data {
-            writer.write(chunk)
-        }
-        writer.finish().await.unwrap();
-        let bytes_written = 
storage.get(&location).await.unwrap().bytes().await.unwrap();
-        assert_eq!(bytes_expected, bytes_written);
-
-        // We can abort an empty write
-        let location = Path::from("test_dir/test_abort_upload.txt");
-        let mut upload = storage.put_multipart(&location).await.unwrap();
-        upload.abort().await.unwrap();
-        let get_res = storage.get(&location).await;
-        assert!(get_res.is_err());
-        assert!(matches!(
-            get_res.unwrap_err(),
-            crate::Error::NotFound { .. }
-        ));
-
-        // We can abort an in-progress write
-        let mut upload = storage.put_multipart(&location).await.unwrap();
-        upload
-            .put_part(data.first().unwrap().clone().into())
-            .await
-            .unwrap();
-
-        upload.abort().await.unwrap();
-        let get_res = storage.get(&location).await;
-        assert!(get_res.is_err());
-        assert!(matches!(
-            get_res.unwrap_err(),
-            crate::Error::NotFound { .. }
-        ));
-    }
-
-    pub(crate) async fn list_uses_directories_correctly(storage: 
&DynObjectStore) {
-        delete_fixtures(storage).await;
-
-        let content_list = flatten_list_stream(storage, None).await.unwrap();
-        assert!(
-            content_list.is_empty(),
-            "Expected list to be empty; found: {content_list:?}"
-        );
-
-        let location1 = Path::from("foo/x.json");
-        let location2 = Path::from("foo.bar/y.json");
-
-        let data = PutPayload::from("arbitrary data");
-        storage.put(&location1, data.clone()).await.unwrap();
-        storage.put(&location2, data).await.unwrap();
-
-        let prefix = Path::from("foo");
-        let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
-        assert_eq!(content_list, &[location1.clone()]);
-
-        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
-        assert_eq!(result.objects.len(), 1);
-        assert_eq!(result.objects[0].location, location1);
-        assert_eq!(result.common_prefixes, &[]);
-
-        // Listing an existing path (file) should return an empty list:
-        // https://github.com/apache/arrow-rs/issues/3712
-        let content_list = flatten_list_stream(storage, Some(&location1))
-            .await
-            .unwrap();
-        assert_eq!(content_list, &[]);
-
-        let list = 
storage.list_with_delimiter(Some(&location1)).await.unwrap();
-        assert_eq!(list.objects, &[]);
-        assert_eq!(list.common_prefixes, &[]);
-
-        let prefix = Path::from("foo/x");
-        let content_list = flatten_list_stream(storage, 
Some(&prefix)).await.unwrap();
-        assert_eq!(content_list, &[]);
-
-        let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
-        assert_eq!(list.objects, &[]);
-        assert_eq!(list.common_prefixes, &[]);
-    }
-
-    pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
-        delete_fixtures(storage).await;
-
-        // ==================== check: store is empty ====================
-        let content_list = flatten_list_stream(storage, None).await.unwrap();
-        assert!(content_list.is_empty());
-
-        // ==================== do: create files ====================
-        let data = Bytes::from("arbitrary data");
-
-        let files: Vec<_> = [
-            "test_file",
-            "mydb/wb/000/000/000.segment",
-            "mydb/wb/000/000/001.segment",
-            "mydb/wb/000/000/002.segment",
-            "mydb/wb/001/001/000.segment",
-            "mydb/wb/foo.json",
-            "mydb/wbwbwb/111/222/333.segment",
-            "mydb/data/whatevs",
-        ]
-        .iter()
-        .map(|&s| Path::from(s))
-        .collect();
-
-        for f in &files {
-            storage.put(f, data.clone().into()).await.unwrap();
-        }
-
-        // ==================== check: prefix-list `mydb/wb` (directory) 
====================
-        let prefix = Path::from("mydb/wb");
-
-        let expected_000 = Path::from("mydb/wb/000");
-        let expected_001 = Path::from("mydb/wb/001");
-        let expected_location = Path::from("mydb/wb/foo.json");
-
-        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
-
-        assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
-        assert_eq!(result.objects.len(), 1);
-
-        let object = &result.objects[0];
-
-        assert_eq!(object.location, expected_location);
-        assert_eq!(object.size, data.len());
-
-        // ==================== check: prefix-list `mydb/wb/000/000/001` 
(partial filename doesn't match) ====================
-        let prefix = Path::from("mydb/wb/000/000/001");
-
-        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
-        assert!(result.common_prefixes.is_empty());
-        assert_eq!(result.objects.len(), 0);
-
-        // ==================== check: prefix-list `not_there` (non-existing 
prefix) ====================
-        let prefix = Path::from("not_there");
-
-        let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
-        assert!(result.common_prefixes.is_empty());
-        assert!(result.objects.is_empty());
-
-        // ==================== do: remove all files ====================
-        for f in &files {
-            storage.delete(f).await.unwrap();
-        }
-
-        // ==================== check: store is empty ====================
-        let content_list = flatten_list_stream(storage, None).await.unwrap();
-        assert!(content_list.is_empty());
-    }
-
-    pub(crate) async fn get_nonexistent_object(
-        storage: &DynObjectStore,
-        location: Option<Path>,
-    ) -> crate::Result<Bytes> {
-        let location = location.unwrap_or_else(|| 
Path::from("this_file_should_not_exist"));
-
-        let err = storage.head(&location).await.unwrap_err();
-        assert!(matches!(err, crate::Error::NotFound { .. }));
-
-        storage.get(&location).await?.bytes().await
-    }
-
-    pub(crate) async fn rename_and_copy(storage: &DynObjectStore) {
-        // Create two objects
-        let path1 = Path::from("test1");
-        let path2 = Path::from("test2");
-        let contents1 = Bytes::from("cats");
-        let contents2 = Bytes::from("dogs");
-
-        // copy() make both objects identical
-        storage.put(&path1, contents1.clone().into()).await.unwrap();
-        storage.put(&path2, contents2.clone().into()).await.unwrap();
-        storage.copy(&path1, &path2).await.unwrap();
-        let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
-        assert_eq!(&new_contents, &contents1);
-
-        // rename() copies contents and deletes original
-        storage.put(&path1, contents1.clone().into()).await.unwrap();
-        storage.put(&path2, contents2.clone().into()).await.unwrap();
-        storage.rename(&path1, &path2).await.unwrap();
-        let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
-        assert_eq!(&new_contents, &contents1);
-        let result = storage.get(&path1).await;
-        assert!(result.is_err());
-        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
-        // Clean up
-        storage.delete(&path2).await.unwrap();
-    }
-
-    pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
-        // Create two objects
-        let path1 = Path::from("test1");
-        let path2 = Path::from("not_exists_nested/test2");
-        let contents1 = Bytes::from("cats");
-        let contents2 = Bytes::from("dogs");
-
-        // copy_if_not_exists() errors if destination already exists
-        storage.put(&path1, contents1.clone().into()).await.unwrap();
-        storage.put(&path2, contents2.clone().into()).await.unwrap();
-        let result = storage.copy_if_not_exists(&path1, &path2).await;
-        assert!(result.is_err());
-        assert!(matches!(
-            result.unwrap_err(),
-            crate::Error::AlreadyExists { .. }
-        ));
-
-        // copy_if_not_exists() copies contents and allows deleting original
-        storage.delete(&path2).await.unwrap();
-        storage.copy_if_not_exists(&path1, &path2).await.unwrap();
-        storage.delete(&path1).await.unwrap();
-        let new_contents = 
storage.get(&path2).await.unwrap().bytes().await.unwrap();
-        assert_eq!(&new_contents, &contents1);
-        let result = storage.get(&path1).await;
-        assert!(result.is_err());
-        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
-        // Clean up
-        storage.delete(&path2).await.unwrap();
-    }
-
-    pub(crate) async fn copy_rename_nonexistent_object(storage: 
&DynObjectStore) {
-        // Create empty source object
-        let path1 = Path::from("test1");
-
-        // Create destination object
-        let path2 = Path::from("test2");
-        storage.put(&path2, "hello".into()).await.unwrap();
-
-        // copy() errors if source does not exist
-        let result = storage.copy(&path1, &path2).await;
-        assert!(result.is_err());
-        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
-        // rename() errors if source does not exist
-        let result = storage.rename(&path1, &path2).await;
-        assert!(result.is_err());
-        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
-        // copy_if_not_exists() errors if source does not exist
-        let result = storage.copy_if_not_exists(&path1, &path2).await;
-        assert!(result.is_err());
-        assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
-        // Clean up
-        storage.delete(&path2).await.unwrap();
-    }
-
-    pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn 
MultipartStore) {
-        let path = Path::from("test_multipart");
-        let chunk_size = 5 * 1024 * 1024;
-
-        let chunks = get_chunks(chunk_size, 2);
-
-        let id = multipart.create_multipart(&path).await.unwrap();
-
-        let parts: Vec<_> = futures::stream::iter(chunks)
-            .enumerate()
-            .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
-            .buffered(2)
-            .try_collect()
-            .await
-            .unwrap();
-
-        multipart
-            .complete_multipart(&path, &id, parts)
-            .await
-            .unwrap();
-
-        let meta = storage.head(&path).await.unwrap();
-        assert_eq!(meta.size, chunk_size * 2);
-
-        // Empty case
-        let path = Path::from("test_empty_multipart");
-
-        let id = multipart.create_multipart(&path).await.unwrap();
-
-        let parts = vec![];
-
-        multipart
-            .complete_multipart(&path, &id, parts)
-            .await
-            .unwrap();
-
-        let meta = storage.head(&path).await.unwrap();
-        assert_eq!(meta.size, 0);
+    /// Test that the returned stream does not borrow the lifetime of Path
+    fn list_store<'a>(
+        store: &'a dyn ObjectStore,
+        path_str: &str,
+    ) -> BoxStream<'a, Result<ObjectMeta>> {
+        let path = Path::from(path_str);
+        store.list(Some(&path))
     }
 
     #[cfg(any(feature = "azure", feature = "aws"))]
-    pub(crate) async fn signing<T>(integration: &T)
+    pub async fn signing<T>(integration: &T)
     where
-        T: ObjectStore + crate::signer::Signer,
+        T: ObjectStore + signer::Signer,
     {
         use reqwest::Method;
         use std::time::Duration;
@@ -2367,7 +1337,7 @@ mod tests {
     }
 
     #[cfg(any(feature = "aws", feature = "azure"))]
-    pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, 
validate: bool, get_tags: F)
+    pub async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: 
bool, get_tags: F)
     where
         F: Fn(Path) -> Fut + Send + Sync,
         Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
@@ -2444,24 +1414,6 @@ mod tests {
         }
     }
 
-    async fn delete_fixtures(storage: &DynObjectStore) {
-        let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
-        storage
-            .delete_stream(paths)
-            .try_collect::<Vec<_>>()
-            .await
-            .unwrap();
-    }
-
-    /// Test that the returned stream does not borrow the lifetime of Path
-    fn list_store<'a>(
-        store: &'a dyn ObjectStore,
-        path_str: &str,
-    ) -> BoxStream<'a, Result<ObjectMeta>> {
-        let path = Path::from(path_str);
-        store.list(Some(&path))
-    }
-
     #[tokio::test]
     async fn test_list_lifetimes() {
         let store = memory::InMemory::new();
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index f3e1d4296fe..64b96ad1a96 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -273,9 +273,9 @@ impl MultipartUpload for LimitUpload {
 
 #[cfg(test)]
 mod tests {
+    use crate::integration::*;
     use crate::limit::LimitStore;
     use crate::memory::InMemory;
-    use crate::tests::*;
     use crate::ObjectStore;
     use futures::stream::StreamExt;
     use std::pin::Pin;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 8dec5bee0a2..95b50d6743f 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -1005,8 +1005,7 @@ mod tests {
     use futures::TryStreamExt;
     use tempfile::{NamedTempFile, TempDir};
 
-    use crate::test_util::flatten_list_stream;
-    use crate::tests::*;
+    use crate::integration::*;
 
     use super::*;
 
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index daf14e17510..0d72983b049 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -528,7 +528,7 @@ impl MultipartUpload for InMemoryUpload {
 
 #[cfg(test)]
 mod tests {
-    use crate::tests::*;
+    use crate::integration::*;
 
     use super::*;
 
diff --git a/object_store/src/parse.rs b/object_store/src/parse.rs
index 5549fd3a3e5..e5d5149fb7a 100644
--- a/object_store/src/parse.rs
+++ b/object_store/src/parse.rs
@@ -25,15 +25,9 @@ use url::Url;
 
 #[derive(Debug, Snafu)]
 enum Error {
-    #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
-    InvalidUrl { url: Url },
-
     #[snafu(display("Unable to recognise URL \"{}\"", url))]
     Unrecognised { url: Url },
 
-    #[snafu(display("Feature {scheme:?} not enabled"))]
-    NotEnabled { scheme: ObjectStoreScheme },
-
     #[snafu(context(false))]
     Path { source: crate::path::Error },
 }
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 7c9ea5804c3..9b10fea5e0b 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -204,9 +204,8 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::integration::*;
     use crate::local::LocalFileSystem;
-    use crate::test_util::flatten_list_stream;
-    use crate::tests::*;
 
     use tempfile::TempDir;
 
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 38b6d7c3bf4..d07276c3dca 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -398,7 +398,7 @@ impl MultipartUpload for ThrottledUpload {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::{memory::InMemory, tests::*, GetResultPayload};
+    use crate::{integration::*, memory::InMemory, GetResultPayload};
     use futures::TryStreamExt;
     use tokio::time::Duration;
     use tokio::time::Instant;

Reply via email to