This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 520ad68beec Export object_store integration tests (#5709)
520ad68beec is described below
commit 520ad68beeceb8888e926d43218a851c4ef65ce3
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Sat May 4 22:13:25 2024 +0100
Export object_store integration tests (#5709)
* Export object_store integration tests
* Clippy
* Clippy
* Even more clippy
* Format
---
object_store/Cargo.toml | 1 +
object_store/src/aws/builder.rs | 12 -
object_store/src/aws/client.rs | 6 -
object_store/src/aws/dynamo.rs | 12 -
object_store/src/aws/mod.rs | 18 +-
object_store/src/azure/builder.rs | 5 -
object_store/src/azure/client.rs | 11 -
object_store/src/azure/mod.rs | 3 +-
object_store/src/chunked.rs | 2 +-
object_store/src/client/list.rs | 1 +
object_store/src/gcp/builder.rs | 5 -
object_store/src/gcp/client.rs | 6 -
object_store/src/gcp/credential.rs | 2 +-
object_store/src/gcp/mod.rs | 15 +-
object_store/src/http/mod.rs | 6 +-
object_store/src/integration.rs | 1082 ++++++++++++++++++++++++++++++++++++
object_store/src/lib.rs | 1082 +-----------------------------------
object_store/src/limit.rs | 2 +-
object_store/src/local.rs | 3 +-
object_store/src/memory.rs | 2 +-
object_store/src/parse.rs | 6 -
object_store/src/prefix.rs | 3 +-
object_store/src/throttle.rs | 2 +-
23 files changed, 1130 insertions(+), 1157 deletions(-)
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index ca1e5b32703..c61946f7f83 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -66,6 +66,7 @@ gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud", "md-5"]
http = ["cloud"]
tls-webpki-roots = ["reqwest?/rustls-tls-webpki-roots"]
+integration = []
[dev-dependencies] # In alphabetical order
futures-test = "0.3"
diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index 664e1836460..1e42093fd8a 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -70,21 +70,9 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
- #[snafu(display("Bucket '{}' not found", bucket))]
- BucketNotFound { bucket: String },
-
- #[snafu(display("Failed to resolve region for bucket '{}'", bucket))]
- ResolveRegion {
- bucket: String,
- source: reqwest::Error,
- },
-
#[snafu(display("Invalid Zone suffix for bucket '{bucket}'"))]
ZoneSuffix { bucket: String },
- #[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
- RegionParse { bucket: String },
-
#[snafu(display("Invalid encryption type: {}. Valid values are \"AES256\",
\"sse:kms\", and \"sse:kms:dsse\".", passed))]
InvalidEncryptionType { passed: String },
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 24247688e86..98226c4b6b1 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -66,12 +66,6 @@ const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub(crate) enum Error {
- #[snafu(display("Error fetching get response body {}: {}", path, source))]
- GetResponseBody {
- source: reqwest::Error,
- path: String,
- },
-
#[snafu(display("Error performing DeleteObjects request: {}", source))]
DeleteObjectsRequest { source: crate::client::retry::Error },
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
index 2390187e7f7..9de67e5aaa4 100644
--- a/object_store/src/aws/dynamo.rs
+++ b/object_store/src/aws/dynamo.rs
@@ -451,18 +451,6 @@ struct PutItem<'a> {
return_values_on_condition_check_failure: Option<ReturnValues>,
}
-/// A DynamoDB [GetItem] payload
-///
-/// [GetItem]:
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_GetItem.html
-#[derive(Serialize)]
-#[serde(rename_all = "PascalCase")]
-struct GetItem<'a> {
- /// The table name
- table_name: &'a str,
- /// The primary key
- key: Map<'a, &'a str, AttributeValue<'a>>,
-}
-
#[derive(Deserialize)]
struct ErrorResponse<'a> {
#[serde(rename = "__type")]
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 5bc6d56e7c7..f5204a5365e 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -409,14 +409,16 @@ impl MultipartStore for AmazonS3 {
#[cfg(test)]
mod tests {
use super::*;
- use crate::{client::get::GetClient, tests::*};
+ use crate::client::get::GetClient;
+ use crate::integration::*;
+ use crate::tests::*;
use hyper::HeaderMap;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
async fn s3_test() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config = AmazonS3Builder::from_env();
let integration = config.build().unwrap();
@@ -475,7 +477,7 @@ mod tests {
#[tokio::test]
async fn s3_test_get_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration = AmazonS3Builder::from_env().build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -488,7 +490,7 @@ mod tests {
#[tokio::test]
async fn s3_test_get_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config =
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
let integration = config.build().unwrap();
@@ -500,7 +502,7 @@ mod tests {
#[tokio::test]
async fn s3_test_put_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config =
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
let integration = config.build().unwrap();
@@ -513,7 +515,7 @@ mod tests {
#[tokio::test]
async fn s3_test_delete_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration = AmazonS3Builder::from_env().build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -523,7 +525,7 @@ mod tests {
#[tokio::test]
async fn s3_test_delete_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config =
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
let integration = config.build().unwrap();
@@ -560,7 +562,7 @@ mod tests {
}
async fn s3_encryption(store: &AmazonS3) {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let data = PutPayload::from(vec![3u8; 1024]);
diff --git a/object_store/src/azure/builder.rs
b/object_store/src/azure/builder.rs
index ee0953456ad..c0c4e8983a0 100644
--- a/object_store/src/azure/builder.rs
+++ b/object_store/src/azure/builder.rs
@@ -89,11 +89,6 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
-
- #[snafu(display("Unable to extract metadata from headers: {}", source))]
- Metadata {
- source: crate::client::header::Error,
- },
}
impl From<Error> for crate::Error {
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 311bd72ff52..be760c7dd36 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -67,12 +67,6 @@ pub(crate) enum Error {
path: String,
},
- #[snafu(display("Error getting get response body {}: {}", path, source))]
- GetResponseBody {
- source: reqwest::Error,
- path: String,
- },
-
#[snafu(display("Error performing put request {}: {}", path, source))]
PutRequest {
source: crate::client::retry::Error,
@@ -94,11 +88,6 @@ pub(crate) enum Error {
#[snafu(display("Got invalid list response: {}", source))]
InvalidListResponse { source: quick_xml::de::DeError },
- #[snafu(display("Error authorizing request: {}", source))]
- Authorization {
- source: crate::azure::credential::Error,
- },
-
#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 755f3b1265f..f89a184f952 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -276,12 +276,13 @@ impl MultipartStore for MicrosoftAzure {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::integration::*;
use crate::tests::*;
use bytes::Bytes;
#[tokio::test]
async fn azure_blob_test() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
put_get_delete_list(&integration).await;
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index a3bd7626787..98cc2049801 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -178,10 +178,10 @@ impl ObjectStore for ChunkedStore {
mod tests {
use futures::StreamExt;
+ use crate::integration::*;
use crate::local::LocalFileSystem;
use crate::memory::InMemory;
use crate::path::Path;
- use crate::tests::*;
use super::*;
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
index 371894dfeb7..2dbe20f3f51 100644
--- a/object_store/src/client/list.rs
+++ b/object_store/src/client/list.rs
@@ -48,6 +48,7 @@ pub trait ListClientExt {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>;
+ #[allow(unused)]
fn list_with_offset(
&self,
prefix: Option<&Path>,
diff --git a/object_store/src/gcp/builder.rs b/object_store/src/gcp/builder.rs
index e6da312072c..82dab14437d 100644
--- a/object_store/src/gcp/builder.rs
+++ b/object_store/src/gcp/builder.rs
@@ -60,11 +60,6 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
- #[snafu(display("Unable to extract metadata from headers: {}", source))]
- Metadata {
- source: crate::client::header::Error,
- },
-
#[snafu(display("GCP credential error: {}", source))]
Credential { source: credential::Error },
}
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index a5493256546..35c64cc3d2c 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -81,12 +81,6 @@ enum Error {
#[snafu(display("Got invalid put response: {}", source))]
InvalidPutResponse { source: quick_xml::de::DeError },
- #[snafu(display("Error performing post request {}: {}", path, source))]
- PostRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
#[snafu(display("Unable to extract metadata from headers: {}", source))]
Metadata {
source: crate::client::header::Error,
diff --git a/object_store/src/gcp/credential.rs
b/object_store/src/gcp/credential.rs
index d7fc2cea2c7..829db9b1aec 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -536,7 +536,7 @@ impl ApplicationDefaultCredentials {
let path = Path::new(&home).join(Self::CREDENTIALS_PATH);
// It's expected for this file to not exist unless it has been
explicitly configured by the user.
- if path.try_exists().unwrap_or(false) {
+ if path.exists() {
return read_credentials_file::<Self>(path).map(Some);
}
}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 0ec6e7e8264..039ec46b68c 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -273,6 +273,7 @@ mod test {
use credential::DEFAULT_GCS_BASE_URL;
+ use crate::integration::*;
use crate::tests::*;
use super::*;
@@ -281,7 +282,7 @@ mod test {
#[tokio::test]
async fn gcs_test() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
put_get_delete_list(&integration).await;
@@ -307,7 +308,7 @@ mod test {
#[tokio::test]
#[ignore]
async fn gcs_test_sign() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
let client = reqwest::Client::new();
@@ -336,7 +337,7 @@ mod test {
#[tokio::test]
async fn gcs_test_get_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -351,7 +352,7 @@ mod test {
#[tokio::test]
async fn gcs_test_get_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config = GoogleCloudStorageBuilder::from_env();
let integration =
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
@@ -369,7 +370,7 @@ mod test {
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
let location = Path::from_iter([NON_EXISTENT_NAME]);
@@ -383,7 +384,7 @@ mod test {
#[tokio::test]
async fn gcs_test_delete_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config = GoogleCloudStorageBuilder::from_env();
let integration =
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
@@ -398,7 +399,7 @@ mod test {
#[tokio::test]
async fn gcs_test_put_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let config = GoogleCloudStorageBuilder::from_env();
let integration =
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 404211e578d..4b1c927e74f 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -64,9 +64,6 @@ enum Error {
Metadata {
source: crate::client::header::Error,
},
-
- #[snafu(display("Request error: {}", source))]
- Reqwest { source: reqwest::Error },
}
impl From<Error> for crate::Error {
@@ -249,13 +246,14 @@ impl HttpBuilder {
#[cfg(test)]
mod tests {
+ use crate::integration::*;
use crate::tests::*;
use super::*;
#[tokio::test]
async fn http_test() {
- crate::test_util::maybe_skip_integration!();
+ maybe_skip_integration!();
let url = std::env::var("HTTP_URL").expect("HTTP_URL must be set");
let options = ClientOptions::new().with_allow_http(true);
let integration = HttpBuilder::new()
diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs
new file mode 100644
index 00000000000..9a7d117158c
--- /dev/null
+++ b/object_store/src/integration.rs
@@ -0,0 +1,1082 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for custom object store implementations
+//!
+//! NB: These tests will delete everything present in the provided
[`DynObjectStore`].
+//!
+//! These tests are not a stable part of the public API and breaking changes
may be made
+//! in patch releases.
+//!
+//! They are intended solely for testing purposes.
+
+use crate::multipart::MultipartStore;
+use crate::path::Path;
+use crate::{
+ Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange,
ObjectStore, PutMode,
+ PutPayload, UpdateVersion, WriteMultipart,
+};
+use bytes::Bytes;
+use futures::stream::FuturesUnordered;
+use futures::{StreamExt, TryStreamExt};
+use rand::distributions::Alphanumeric;
+use rand::{thread_rng, Rng};
+
+pub(crate) async fn flatten_list_stream(
+ storage: &DynObjectStore,
+ prefix: Option<&Path>,
+) -> crate::Result<Vec<Path>> {
+ storage
+ .list(prefix)
+ .map_ok(|meta| meta.location)
+ .try_collect::<Vec<Path>>()
+ .await
+}
+
+/// Tests basic read/write and listing operations
+pub async fn put_get_delete_list(storage: &DynObjectStore) {
+ delete_fixtures(storage).await;
+
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
+ assert!(
+ content_list.is_empty(),
+ "Expected list to be empty; found: {content_list:?}"
+ );
+
+ let location = Path::from("test_dir/test_file.json");
+
+ let data = Bytes::from("arbitrary data");
+ storage.put(&location, data.clone().into()).await.unwrap();
+
+ let root = Path::from("/");
+
+ // List everything
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
+ assert_eq!(content_list, &[location.clone()]);
+
+ // Should behave the same as no prefix
+ let content_list = flatten_list_stream(storage,
Some(&root)).await.unwrap();
+ assert_eq!(content_list, &[location.clone()]);
+
+ // List with delimiter
+ let result = storage.list_with_delimiter(None).await.unwrap();
+ assert_eq!(&result.objects, &[]);
+ assert_eq!(result.common_prefixes.len(), 1);
+ assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+ // Should behave the same as no prefix
+ let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
+ assert!(result.objects.is_empty());
+ assert_eq!(result.common_prefixes.len(), 1);
+ assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
+
+ // Should return not found
+ let err = storage.get(&Path::from("test_dir")).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ // Should return not found
+ let err = storage.head(&Path::from("test_dir")).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ // List everything starting with a prefix that should return results
+ let prefix = Path::from("test_dir");
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
+ assert_eq!(content_list, &[location.clone()]);
+
+ // List everything starting with a prefix that shouldn't return results
+ let prefix = Path::from("something");
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
+ assert!(content_list.is_empty());
+
+ let read_data =
storage.get(&location).await.unwrap().bytes().await.unwrap();
+ assert_eq!(&*read_data, data);
+
+ // Test range request
+ let range = 3..7;
+ let range_result = storage.get_range(&location, range.clone()).await;
+
+ let bytes = range_result.unwrap();
+ assert_eq!(bytes, data.slice(range.clone()));
+
+ let opts = GetOptions {
+ range: Some(GetRange::Bounded(2..5)),
+ ..Default::default()
+ };
+ let result = storage.get_opts(&location, opts).await.unwrap();
+ // Data is `"arbitrary data"`, length 14 bytes
+ assert_eq!(result.meta.size, 14); // Should return full object size (#5272)
+ assert_eq!(result.range, 2..5);
+ let bytes = result.bytes().await.unwrap();
+ assert_eq!(bytes, b"bit".as_ref());
+
+ let out_of_range = 200..300;
+ let out_of_range_result = storage.get_range(&location, out_of_range).await;
+
+ // Should be a non-fatal error
+ out_of_range_result.unwrap_err();
+
+ let opts = GetOptions {
+ range: Some(GetRange::Bounded(2..100)),
+ ..Default::default()
+ };
+ let result = storage.get_opts(&location, opts).await.unwrap();
+ assert_eq!(result.range, 2..14);
+ assert_eq!(result.meta.size, 14);
+ let bytes = result.bytes().await.unwrap();
+ assert_eq!(bytes, b"bitrary data".as_ref());
+
+ let opts = GetOptions {
+ range: Some(GetRange::Suffix(2)),
+ ..Default::default()
+ };
+ match storage.get_opts(&location, opts).await {
+ Ok(result) => {
+ assert_eq!(result.range, 12..14);
+ assert_eq!(result.meta.size, 14);
+ let bytes = result.bytes().await.unwrap();
+ assert_eq!(bytes, b"ta".as_ref());
+ }
+ Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let opts = GetOptions {
+ range: Some(GetRange::Suffix(100)),
+ ..Default::default()
+ };
+ match storage.get_opts(&location, opts).await {
+ Ok(result) => {
+ assert_eq!(result.range, 0..14);
+ assert_eq!(result.meta.size, 14);
+ let bytes = result.bytes().await.unwrap();
+ assert_eq!(bytes, b"arbitrary data".as_ref());
+ }
+ Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let opts = GetOptions {
+ range: Some(GetRange::Offset(3)),
+ ..Default::default()
+ };
+ let result = storage.get_opts(&location, opts).await.unwrap();
+ assert_eq!(result.range, 3..14);
+ assert_eq!(result.meta.size, 14);
+ let bytes = result.bytes().await.unwrap();
+ assert_eq!(bytes, b"itrary data".as_ref());
+
+ let opts = GetOptions {
+ range: Some(GetRange::Offset(100)),
+ ..Default::default()
+ };
+ storage.get_opts(&location, opts).await.unwrap_err();
+
+ let ranges = vec![0..1, 2..3, 0..5];
+ let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
+ for (range, bytes) in ranges.iter().zip(bytes) {
+ assert_eq!(bytes, data.slice(range.clone()))
+ }
+
+ let head = storage.head(&location).await.unwrap();
+ assert_eq!(head.size, data.len());
+
+ storage.delete(&location).await.unwrap();
+
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
+ assert!(content_list.is_empty());
+
+ let err = storage.get(&location).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ let err = storage.head(&location).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ // Test handling of paths containing an encoded delimiter
+
+ let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
+ storage
+ .put(&file_with_delimiter, "arbitrary".into())
+ .await
+ .unwrap();
+
+ let files = flatten_list_stream(storage, None).await.unwrap();
+ assert_eq!(files, vec![file_with_delimiter.clone()]);
+
+ let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
+ .await
+ .unwrap();
+ assert!(files.is_empty());
+
+ let files = storage
+ .list_with_delimiter(Some(&Path::from("a/b")))
+ .await
+ .unwrap();
+ assert!(files.common_prefixes.is_empty());
+ assert!(files.objects.is_empty());
+
+ let files = storage
+ .list_with_delimiter(Some(&Path::from("a")))
+ .await
+ .unwrap();
+ assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
+ assert!(files.objects.is_empty());
+
+ let files = storage
+ .list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
+ .await
+ .unwrap();
+ assert!(files.common_prefixes.is_empty());
+ assert_eq!(files.objects.len(), 1);
+ assert_eq!(files.objects[0].location, file_with_delimiter);
+
+ storage.delete(&file_with_delimiter).await.unwrap();
+
+ // Test handling of paths containing non-ASCII characters, e.g. emoji
+
+ let emoji_prefix = Path::from("🙀");
+ let emoji_file = Path::from("🙀/😀.parquet");
+ storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
+
+ storage.head(&emoji_file).await.unwrap();
+ storage
+ .get(&emoji_file)
+ .await
+ .unwrap()
+ .bytes()
+ .await
+ .unwrap();
+
+ let files = flatten_list_stream(storage, Some(&emoji_prefix))
+ .await
+ .unwrap();
+
+ assert_eq!(files, vec![emoji_file.clone()]);
+
+ let dst = Path::from("foo.parquet");
+ storage.copy(&emoji_file, &dst).await.unwrap();
+ let mut files = flatten_list_stream(storage, None).await.unwrap();
+ files.sort_unstable();
+ assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
+
+ let dst2 = Path::from("new/nested/foo.parquet");
+ storage.copy(&emoji_file, &dst2).await.unwrap();
+ let mut files = flatten_list_stream(storage, None).await.unwrap();
+ files.sort_unstable();
+ assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);
+
+ let dst3 = Path::from("new/nested2/bar.parquet");
+ storage.rename(&dst, &dst3).await.unwrap();
+ let mut files = flatten_list_stream(storage, None).await.unwrap();
+ files.sort_unstable();
+ assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);
+
+ let err = storage.head(&dst).await.unwrap_err();
+ assert!(matches!(err, Error::NotFound { .. }));
+
+ storage.delete(&emoji_file).await.unwrap();
+ storage.delete(&dst3).await.unwrap();
+ storage.delete(&dst2).await.unwrap();
+ let files = flatten_list_stream(storage, Some(&emoji_prefix))
+ .await
+ .unwrap();
+ assert!(files.is_empty());
+
+ // Test handling of paths containing percent-encoded sequences
+
+ // "HELLO" percent encoded
+ let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
+ let path = hello_prefix.child("foo.parquet");
+
+ storage.put(&path, vec![0, 1].into()).await.unwrap();
+ let files = flatten_list_stream(storage, Some(&hello_prefix))
+ .await
+ .unwrap();
+ assert_eq!(files, vec![path.clone()]);
+
+ // Cannot list by decoded representation
+ let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
+ .await
+ .unwrap();
+ assert!(files.is_empty());
+
+ // Cannot access by decoded representation
+ let err = storage
+ .head(&Path::from("HELLO/foo.parquet"))
+ .await
+ .unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ storage.delete(&path).await.unwrap();
+
+ // Test handling of unicode paths
+ let path = Path::parse("🇦🇺/$shenanigans@@~.txt").unwrap();
+ storage.put(&path, "test".into()).await.unwrap();
+
+ let r = storage.get(&path).await.unwrap();
+ assert_eq!(r.bytes().await.unwrap(), "test");
+
+ let dir = Path::parse("🇦🇺").unwrap();
+ let r = storage.list_with_delimiter(None).await.unwrap();
+ assert!(r.common_prefixes.contains(&dir));
+
+ let r = storage.list_with_delimiter(Some(&dir)).await.unwrap();
+ assert_eq!(r.objects.len(), 1);
+ assert_eq!(r.objects[0].location, path);
+
+ storage.delete(&path).await.unwrap();
+
+ // Can also write non-percent encoded sequences
+ let path = Path::parse("%Q.parquet").unwrap();
+ storage.put(&path, vec![0, 1].into()).await.unwrap();
+
+ let files = flatten_list_stream(storage, None).await.unwrap();
+ assert_eq!(files, vec![path.clone()]);
+
+ storage.delete(&path).await.unwrap();
+
+ let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
+ storage.put(&path, vec![0, 1].into()).await.unwrap();
+ storage.head(&path).await.unwrap();
+
+ let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
+ .await
+ .unwrap();
+ assert_eq!(files, vec![path.clone()]);
+
+ storage.delete(&path).await.unwrap();
+
+ let files = flatten_list_stream(storage, None).await.unwrap();
+ assert!(files.is_empty(), "{files:?}");
+
+ // Test list order
+ let files = vec![
+ Path::from("a a/b.file"),
+ Path::parse("a%2Fa.file").unwrap(),
+ Path::from("a/😀.file"),
+ Path::from("a/a file"),
+ Path::parse("a/a%2F.file").unwrap(),
+ Path::from("a/a.file"),
+ Path::from("a/a/b.file"),
+ Path::from("a/b.file"),
+ Path::from("aa/a.file"),
+ Path::from("ab/a.file"),
+ ];
+
+ for file in &files {
+ storage.put(file, "foo".into()).await.unwrap();
+ }
+
+ let cases = [
+ (None, Path::from("a")),
+ (None, Path::from("a/a file")),
+ (None, Path::from("a/a/b.file")),
+ (None, Path::from("ab/a.file")),
+ (None, Path::from("a%2Fa.file")),
+ (None, Path::from("a/😀.file")),
+ (Some(Path::from("a")), Path::from("")),
+ (Some(Path::from("a")), Path::from("a")),
+ (Some(Path::from("a")), Path::from("a/😀")),
+ (Some(Path::from("a")), Path::from("a/😀.file")),
+ (Some(Path::from("a")), Path::from("a/b")),
+ (Some(Path::from("a")), Path::from("a/a/b.file")),
+ ];
+
+ for (prefix, offset) in cases {
+ let s = storage.list_with_offset(prefix.as_ref(), &offset);
+ let mut actual: Vec<_> = s.map_ok(|x|
x.location).try_collect().await.unwrap();
+
+ actual.sort_unstable();
+
+ let expected: Vec<_> = files
+ .iter()
+ .filter(|x| {
+ let prefix_match = prefix.as_ref().map(|p|
x.prefix_matches(p)).unwrap_or(true);
+ prefix_match && *x > &offset
+ })
+ .cloned()
+ .collect();
+
+ assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
+ }
+
+ // Test bulk delete
+ let paths = vec![
+ Path::from("a/a.file"),
+ Path::from("a/a/b.file"),
+ Path::from("aa/a.file"),
+ Path::from("does_not_exist"),
+ Path::from("I'm a < & weird path"),
+ Path::from("ab/a.file"),
+ Path::from("a/😀.file"),
+ ];
+
+ storage.put(&paths[4], "foo".into()).await.unwrap();
+
+ let out_paths = storage
+ .delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
+ .collect::<Vec<_>>()
+ .await;
+
+ assert_eq!(out_paths.len(), paths.len());
+
+ let expect_errors = [3];
+
+ for (i, input_path) in paths.iter().enumerate() {
+ let err = storage.head(input_path).await.unwrap_err();
+ assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
+
+ if expect_errors.contains(&i) {
+ // Some object stores will report NotFound, but others (such as
S3) will
+ // report success regardless.
+ match &out_paths[i] {
+ Err(Error::NotFound { path: out_path, .. }) => {
+ assert!(out_path.ends_with(&input_path.to_string()));
+ }
+ Ok(out_path) => {
+ assert_eq!(out_path, input_path);
+ }
+ _ => panic!("unexpected error"),
+ }
+ } else {
+ assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
+ }
+ }
+
+ delete_fixtures(storage).await;
+
+ let path = Path::from("empty");
+ storage.put(&path, PutPayload::default()).await.unwrap();
+ let meta = storage.head(&path).await.unwrap();
+ assert_eq!(meta.size, 0);
+ let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(data.len(), 0);
+
+ storage.delete(&path).await.unwrap();
+}
+
+/// Tests the ability to read and write [`Attributes`]
+pub async fn put_get_attributes(integration: &dyn ObjectStore) {
+ // Test handling of attributes
+ let attributes = Attributes::from_iter([
+ (Attribute::CacheControl, "max-age=604800"),
+ (
+ Attribute::ContentDisposition,
+ r#"attachment; filename="test.html""#,
+ ),
+ (Attribute::ContentEncoding, "gzip"),
+ (Attribute::ContentLanguage, "en-US"),
+ (Attribute::ContentType, "text/html; charset=utf-8"),
+ ]);
+
+ let path = Path::from("attributes");
+ let opts = attributes.clone().into();
+ match integration.put_opts(&path, "foo".into(), opts).await {
+ Ok(_) => {
+ let r = integration.get(&path).await.unwrap();
+ assert_eq!(r.attributes, attributes);
+ }
+ Err(Error::NotImplemented) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let opts = attributes.clone().into();
+ match integration.put_multipart_opts(&path, opts).await {
+ Ok(mut w) => {
+ w.put_part("foo".into()).await.unwrap();
+ w.complete().await.unwrap();
+
+ let r = integration.get(&path).await.unwrap();
+ assert_eq!(r.attributes, attributes);
+ }
+ Err(Error::NotImplemented) => {}
+ Err(e) => panic!("{e}"),
+ }
+}
+
+/// Tests conditional read requests
+pub async fn get_opts(storage: &dyn ObjectStore) {
+ let path = Path::from("test");
+ storage.put(&path, "foo".into()).await.unwrap();
+ let meta = storage.head(&path).await.unwrap();
+
+ let options = GetOptions {
+ if_unmodified_since: Some(meta.last_modified),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Ok(_) | Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let options = GetOptions {
+ if_unmodified_since: Some(meta.last_modified +
chrono::Duration::try_hours(10).unwrap()),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Ok(_) | Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let options = GetOptions {
+ if_unmodified_since: Some(meta.last_modified -
chrono::Duration::try_hours(10).unwrap()),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
+ d => panic!("{d:?}"),
+ }
+
+ let options = GetOptions {
+ if_modified_since: Some(meta.last_modified),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
+ d => panic!("{d:?}"),
+ }
+
+ let options = GetOptions {
+ if_modified_since: Some(meta.last_modified -
chrono::Duration::try_hours(10).unwrap()),
+ ..GetOptions::default()
+ };
+ match storage.get_opts(&path, options).await {
+ Ok(_) | Err(Error::NotSupported { .. }) => {}
+ Err(e) => panic!("{e}"),
+ }
+
+ let tag = meta.e_tag.unwrap();
+ let options = GetOptions {
+ if_match: Some(tag.clone()),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+
+ let options = GetOptions {
+ if_match: Some("invalid".to_string()),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+ let options = GetOptions {
+ if_none_match: Some(tag.clone()),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::NotModified { .. }), "{err}");
+
+ let options = GetOptions {
+ if_none_match: Some("invalid".to_string()),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+
+ let result = storage.put(&path, "test".into()).await.unwrap();
+ let new_tag = result.e_tag.unwrap();
+ assert_ne!(tag, new_tag);
+
+ let meta = storage.head(&path).await.unwrap();
+ assert_eq!(meta.e_tag.unwrap(), new_tag);
+
+ let options = GetOptions {
+ if_match: Some(new_tag),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+
+ let options = GetOptions {
+ if_match: Some(tag),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+ if let Some(version) = meta.version {
+ storage.put(&path, "bar".into()).await.unwrap();
+
+ let options = GetOptions {
+ version: Some(version),
+ ..GetOptions::default()
+ };
+
+ // Can retrieve previous version
+ let get_opts = storage.get_opts(&path, options).await.unwrap();
+ let old = get_opts.bytes().await.unwrap();
+ assert_eq!(old, b"test".as_slice());
+
+ // Current version contains the updated data
+ let current = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(¤t, b"bar".as_slice());
+ }
+}
+
+/// Tests conditional writes
+pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
+ // When using DynamoCommit repeated runs of this test will produce the
same sequence of records in DynamoDB
+ // As a result each conditional operation will need to wait for the lease
to timeout before proceeding
+ // One solution would be to clear DynamoDB before each test, but this
would require non-trivial additional code
+ // so we instead just generate a random suffix for the filenames
+ let rng = thread_rng();
+ let suffix =
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+
+ delete_fixtures(storage).await;
+ let path = Path::from(format!("put_opts_{suffix}"));
+ let v1 = storage
+ .put_opts(&path, "a".into(), PutMode::Create.into())
+ .await
+ .unwrap();
+
+ let err = storage
+ .put_opts(&path, "b".into(), PutMode::Create.into())
+ .await
+ .unwrap_err();
+ assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
+
+ let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(b.as_ref(), b"a");
+
+ if !supports_update {
+ return;
+ }
+
+ let v2 = storage
+ .put_opts(&path, "c".into(), PutMode::Update(v1.clone().into()).into())
+ .await
+ .unwrap();
+
+ let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(b.as_ref(), b"c");
+
+ let err = storage
+ .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
+ .await
+ .unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+ storage
+ .put_opts(&path, "e".into(), PutMode::Update(v2.clone().into()).into())
+ .await
+ .unwrap();
+
+ let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ assert_eq!(b.as_ref(), b"e");
+
+ // Update not exists
+ let path = Path::from("I don't exist");
+ let err = storage
+ .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
+ .await
+ .unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+ const NUM_WORKERS: usize = 5;
+ const NUM_INCREMENTS: usize = 10;
+
+ let path = Path::from(format!("RACE-{suffix}"));
+ let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
+ .map(|_| async {
+ for _ in 0..NUM_INCREMENTS {
+ loop {
+ match storage.get(&path).await {
+ Ok(r) => {
+ let mode = PutMode::Update(UpdateVersion {
+ e_tag: r.meta.e_tag.clone(),
+ version: r.meta.version.clone(),
+ });
+
+ let b = r.bytes().await.unwrap();
+ let v: usize =
std::str::from_utf8(&b).unwrap().parse().unwrap();
+ let new = (v + 1).to_string();
+
+ match storage.put_opts(&path, new.into(),
mode.into()).await {
+ Ok(_) => break,
+ Err(Error::Precondition { .. }) => continue,
+ Err(e) => return Err(e),
+ }
+ }
+ Err(Error::NotFound { .. }) => {
+ let mode = PutMode::Create;
+ match storage.put_opts(&path, "1".into(),
mode.into()).await {
+ Ok(_) => break,
+ Err(Error::AlreadyExists { .. }) => continue,
+ Err(e) => return Err(e),
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ }
+ }
+ Ok(())
+ })
+ .collect();
+
+ while futures.next().await.transpose().unwrap().is_some() {}
+ let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+ let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
+ assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);
+}
+
+/// Returns a chunk of length `chunk_length`
+fn get_chunk(chunk_length: usize) -> Bytes {
+ let mut data = vec![0_u8; chunk_length];
+ let mut rng = thread_rng();
+ // Set a random selection of bytes
+ for _ in 0..1000 {
+ data[rng.gen_range(0..chunk_length)] = rng.gen();
+ }
+ data.into()
+}
+
+/// Returns `num_chunks` of length `chunks`
+fn get_chunks(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
+ (0..num_chunks).map(|_| get_chunk(chunk_length)).collect()
+}
+
+/// Tests the ability to perform multipart writes
+pub async fn stream_get(storage: &DynObjectStore) {
+ let location = Path::from("test_dir/test_upload_file.txt");
+
+ // Can write to storage
+ let data = get_chunks(5 * 1024 * 1024, 3);
+ let bytes_expected = data.concat();
+ let mut upload = storage.put_multipart(&location).await.unwrap();
+ let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
+ futures::future::try_join_all(uploads).await.unwrap();
+
+ // Object should not yet exist in store
+ let meta_res = storage.head(&location).await;
+ assert!(meta_res.is_err());
+ assert!(matches!(
+ meta_res.unwrap_err(),
+ crate::Error::NotFound { .. }
+ ));
+
+ let files = flatten_list_stream(storage, None).await.unwrap();
+ assert_eq!(&files, &[]);
+
+ let result = storage.list_with_delimiter(None).await.unwrap();
+ assert_eq!(&result.objects, &[]);
+
+ upload.complete().await.unwrap();
+
+ let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
+ assert_eq!(bytes_expected, bytes_written);
+
+ // Can overwrite some storage
+ // Sizes chosen to ensure we write three parts
+ let data = get_chunks(3_200_000, 7);
+ let bytes_expected = data.concat();
+ let upload = storage.put_multipart(&location).await.unwrap();
+ let mut writer = WriteMultipart::new(upload);
+ for chunk in &data {
+ writer.write(chunk)
+ }
+ writer.finish().await.unwrap();
+ let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
+ assert_eq!(bytes_expected, bytes_written);
+
+ // We can abort an empty write
+ let location = Path::from("test_dir/test_abort_upload.txt");
+ let mut upload = storage.put_multipart(&location).await.unwrap();
+ upload.abort().await.unwrap();
+ let get_res = storage.get(&location).await;
+ assert!(get_res.is_err());
+ assert!(matches!(
+ get_res.unwrap_err(),
+ crate::Error::NotFound { .. }
+ ));
+
+ // We can abort an in-progress write
+ let mut upload = storage.put_multipart(&location).await.unwrap();
+ upload
+ .put_part(data.first().unwrap().clone().into())
+ .await
+ .unwrap();
+
+ upload.abort().await.unwrap();
+ let get_res = storage.get(&location).await;
+ assert!(get_res.is_err());
+ assert!(matches!(get_res.unwrap_err(), Error::NotFound { .. }));
+}
+
+/// Tests that directories are transparent
+pub async fn list_uses_directories_correctly(storage: &DynObjectStore) {
+ delete_fixtures(storage).await;
+
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
+ assert!(
+ content_list.is_empty(),
+ "Expected list to be empty; found: {content_list:?}"
+ );
+
+ let location1 = Path::from("foo/x.json");
+ let location2 = Path::from("foo.bar/y.json");
+
+ let data = PutPayload::from("arbitrary data");
+ storage.put(&location1, data.clone()).await.unwrap();
+ storage.put(&location2, data).await.unwrap();
+
+ let prefix = Path::from("foo");
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
+ assert_eq!(content_list, &[location1.clone()]);
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert_eq!(result.objects.len(), 1);
+ assert_eq!(result.objects[0].location, location1);
+ assert_eq!(result.common_prefixes, &[]);
+
+ // Listing an existing path (file) should return an empty list:
+ // https://github.com/apache/arrow-rs/issues/3712
+ let content_list = flatten_list_stream(storage, Some(&location1))
+ .await
+ .unwrap();
+ assert_eq!(content_list, &[]);
+
+ let list = storage.list_with_delimiter(Some(&location1)).await.unwrap();
+ assert_eq!(list.objects, &[]);
+ assert_eq!(list.common_prefixes, &[]);
+
+ let prefix = Path::from("foo/x");
+ let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
+ assert_eq!(content_list, &[]);
+
+ let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert_eq!(list.objects, &[]);
+ assert_eq!(list.common_prefixes, &[]);
+}
+
+/// Tests listing with delimiter
+pub async fn list_with_delimiter(storage: &DynObjectStore) {
+ delete_fixtures(storage).await;
+
+ // ==================== check: store is empty ====================
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
+ assert!(content_list.is_empty());
+
+ // ==================== do: create files ====================
+ let data = Bytes::from("arbitrary data");
+
+ let files: Vec<_> = [
+ "test_file",
+ "mydb/wb/000/000/000.segment",
+ "mydb/wb/000/000/001.segment",
+ "mydb/wb/000/000/002.segment",
+ "mydb/wb/001/001/000.segment",
+ "mydb/wb/foo.json",
+ "mydb/wbwbwb/111/222/333.segment",
+ "mydb/data/whatevs",
+ ]
+ .iter()
+ .map(|&s| Path::from(s))
+ .collect();
+
+ for f in &files {
+ storage.put(f, data.clone().into()).await.unwrap();
+ }
+
+ // ==================== check: prefix-list `mydb/wb` (directory)
====================
+ let prefix = Path::from("mydb/wb");
+
+ let expected_000 = Path::from("mydb/wb/000");
+ let expected_001 = Path::from("mydb/wb/001");
+ let expected_location = Path::from("mydb/wb/foo.json");
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+
+ assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
+ assert_eq!(result.objects.len(), 1);
+
+ let object = &result.objects[0];
+
+ assert_eq!(object.location, expected_location);
+ assert_eq!(object.size, data.len());
+
+ // ==================== check: prefix-list `mydb/wb/000/000/001` (partial
filename doesn't match) ====================
+ let prefix = Path::from("mydb/wb/000/000/001");
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert!(result.common_prefixes.is_empty());
+ assert_eq!(result.objects.len(), 0);
+
+ // ==================== check: prefix-list `not_there` (non-existing
prefix) ====================
+ let prefix = Path::from("not_there");
+
+ let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
+ assert!(result.common_prefixes.is_empty());
+ assert!(result.objects.is_empty());
+
+ // ==================== do: remove all files ====================
+ for f in &files {
+ storage.delete(f).await.unwrap();
+ }
+
+ // ==================== check: store is empty ====================
+ let content_list = flatten_list_stream(storage, None).await.unwrap();
+ assert!(content_list.is_empty());
+}
+
+/// Tests fetching a non-existent object returns a not found error
+pub async fn get_nonexistent_object(
+ storage: &DynObjectStore,
+ location: Option<Path>,
+) -> crate::Result<Bytes> {
+ let location = location.unwrap_or_else(||
Path::from("this_file_should_not_exist"));
+
+ let err = storage.head(&location).await.unwrap_err();
+ assert!(matches!(err, Error::NotFound { .. }));
+
+ storage.get(&location).await?.bytes().await
+}
+
+/// Tests copying
+pub async fn rename_and_copy(storage: &DynObjectStore) {
+ // Create two objects
+ let path1 = Path::from("test1");
+ let path2 = Path::from("test2");
+ let contents1 = Bytes::from("cats");
+ let contents2 = Bytes::from("dogs");
+
+ // copy() make both objects identical
+ storage.put(&path1, contents1.clone().into()).await.unwrap();
+ storage.put(&path2, contents2.clone().into()).await.unwrap();
+ storage.copy(&path1, &path2).await.unwrap();
+ let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
+ assert_eq!(&new_contents, &contents1);
+
+ // rename() copies contents and deletes original
+ storage.put(&path1, contents1.clone().into()).await.unwrap();
+ storage.put(&path2, contents2.clone().into()).await.unwrap();
+ storage.rename(&path1, &path2).await.unwrap();
+ let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
+ assert_eq!(&new_contents, &contents1);
+ let result = storage.get(&path1).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), Error::NotFound { .. }));
+
+ // Clean up
+ storage.delete(&path2).await.unwrap();
+}
+
+/// Tests copy if not exists
+pub async fn copy_if_not_exists(storage: &DynObjectStore) {
+ // Create two objects
+ let path1 = Path::from("test1");
+ let path2 = Path::from("not_exists_nested/test2");
+ let contents1 = Bytes::from("cats");
+ let contents2 = Bytes::from("dogs");
+
+ // copy_if_not_exists() errors if destination already exists
+ storage.put(&path1, contents1.clone().into()).await.unwrap();
+ storage.put(&path2, contents2.clone().into()).await.unwrap();
+ let result = storage.copy_if_not_exists(&path1, &path2).await;
+ assert!(result.is_err());
+ assert!(matches!(
+ result.unwrap_err(),
+ crate::Error::AlreadyExists { .. }
+ ));
+
+ // copy_if_not_exists() copies contents and allows deleting original
+ storage.delete(&path2).await.unwrap();
+ storage.copy_if_not_exists(&path1, &path2).await.unwrap();
+ storage.delete(&path1).await.unwrap();
+ let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
+ assert_eq!(&new_contents, &contents1);
+ let result = storage.get(&path1).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+ // Clean up
+ storage.delete(&path2).await.unwrap();
+}
+
+/// Tests copy and renaming behaviour of non-existent objects
+pub async fn copy_rename_nonexistent_object(storage: &DynObjectStore) {
+ // Create empty source object
+ let path1 = Path::from("test1");
+
+ // Create destination object
+ let path2 = Path::from("test2");
+ storage.put(&path2, "hello".into()).await.unwrap();
+
+ // copy() errors if source does not exist
+ let result = storage.copy(&path1, &path2).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+ // rename() errors if source does not exist
+ let result = storage.rename(&path1, &path2).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+ // copy_if_not_exists() errors if source does not exist
+ let result = storage.copy_if_not_exists(&path1, &path2).await;
+ assert!(result.is_err());
+ assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
+
+ // Clean up
+ storage.delete(&path2).await.unwrap();
+}
+
+/// Tests [`MultipartStore`]
+pub async fn multipart(storage: &dyn ObjectStore, multipart: &dyn
MultipartStore) {
+ let path = Path::from("test_multipart");
+ let chunk_size = 5 * 1024 * 1024;
+
+ let chunks = get_chunks(chunk_size, 2);
+
+ let id = multipart.create_multipart(&path).await.unwrap();
+
+ let parts: Vec<_> = futures::stream::iter(chunks)
+ .enumerate()
+ .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
+ .buffered(2)
+ .try_collect()
+ .await
+ .unwrap();
+
+ multipart
+ .complete_multipart(&path, &id, parts)
+ .await
+ .unwrap();
+
+ let meta = storage.head(&path).await.unwrap();
+ assert_eq!(meta.size, chunk_size * 2);
+
+ // Empty case
+ let path = Path::from("test_empty_multipart");
+
+ let id = multipart.create_multipart(&path).await.unwrap();
+
+ let parts = vec![];
+
+ multipart
+ .complete_multipart(&path, &id, parts)
+ .await
+ .unwrap();
+
+ let meta = storage.head(&path).await.unwrap();
+ assert_eq!(meta.size, 0);
+}
+
+async fn delete_fixtures(storage: &DynObjectStore) {
+ let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
+ storage
+ .delete_stream(paths)
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+}
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 9a8f77b4d82..bdf870f45ab 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -545,6 +545,9 @@ mod util;
mod attributes;
+#[cfg(any(feature = "integration", test))]
+pub mod integration;
+
pub use attributes::*;
pub use parse::{parse_url, parse_url_opts};
@@ -1285,9 +1288,11 @@ impl From<Error> for std::io::Error {
}
#[cfg(test)]
-mod test_util {
+mod tests {
use super::*;
- use futures::TryStreamExt;
+ use crate::buffered::BufWriter;
+ use chrono::TimeZone;
+ use tokio::io::AsyncWriteExt;
macro_rules! maybe_skip_integration {
() => {
@@ -1299,1054 +1304,19 @@ mod test_util {
}
pub(crate) use maybe_skip_integration;
- pub async fn flatten_list_stream(
- storage: &DynObjectStore,
- prefix: Option<&Path>,
- ) -> Result<Vec<Path>> {
- storage
- .list(prefix)
- .map_ok(|meta| meta.location)
- .try_collect::<Vec<Path>>()
- .await
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::buffered::BufWriter;
- use crate::multipart::MultipartStore;
- use crate::test_util::flatten_list_stream;
- use chrono::TimeZone;
- use futures::stream::FuturesUnordered;
- use rand::distributions::Alphanumeric;
- use rand::{thread_rng, Rng};
- use tokio::io::AsyncWriteExt;
-
- pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
- delete_fixtures(storage).await;
-
- let content_list = flatten_list_stream(storage, None).await.unwrap();
- assert!(
- content_list.is_empty(),
- "Expected list to be empty; found: {content_list:?}"
- );
-
- let location = Path::from("test_dir/test_file.json");
-
- let data = Bytes::from("arbitrary data");
- storage.put(&location, data.clone().into()).await.unwrap();
-
- let root = Path::from("/");
-
- // List everything
- let content_list = flatten_list_stream(storage, None).await.unwrap();
- assert_eq!(content_list, &[location.clone()]);
-
- // Should behave the same as no prefix
- let content_list = flatten_list_stream(storage,
Some(&root)).await.unwrap();
- assert_eq!(content_list, &[location.clone()]);
-
- // List with delimiter
- let result = storage.list_with_delimiter(None).await.unwrap();
- assert_eq!(&result.objects, &[]);
- assert_eq!(result.common_prefixes.len(), 1);
- assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
-
- // Should behave the same as no prefix
- let result = storage.list_with_delimiter(Some(&root)).await.unwrap();
- assert!(result.objects.is_empty());
- assert_eq!(result.common_prefixes.len(), 1);
- assert_eq!(result.common_prefixes[0], Path::from("test_dir"));
-
- // Should return not found
- let err = storage.get(&Path::from("test_dir")).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
- // Should return not found
- let err = storage.head(&Path::from("test_dir")).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
- // List everything starting with a prefix that should return results
- let prefix = Path::from("test_dir");
- let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
- assert_eq!(content_list, &[location.clone()]);
-
- // List everything starting with a prefix that shouldn't return results
- let prefix = Path::from("something");
- let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
- assert!(content_list.is_empty());
-
- let read_data =
storage.get(&location).await.unwrap().bytes().await.unwrap();
- assert_eq!(&*read_data, data);
-
- // Test range request
- let range = 3..7;
- let range_result = storage.get_range(&location, range.clone()).await;
-
- let bytes = range_result.unwrap();
- assert_eq!(bytes, data.slice(range.clone()));
-
- let opts = GetOptions {
- range: Some(GetRange::Bounded(2..5)),
- ..Default::default()
- };
- let result = storage.get_opts(&location, opts).await.unwrap();
- // Data is `"arbitrary data"`, length 14 bytes
- assert_eq!(result.meta.size, 14); // Should return full object size
(#5272)
- assert_eq!(result.range, 2..5);
- let bytes = result.bytes().await.unwrap();
- assert_eq!(bytes, b"bit".as_ref());
-
- let out_of_range = 200..300;
- let out_of_range_result = storage.get_range(&location,
out_of_range).await;
-
- // Should be a non-fatal error
- out_of_range_result.unwrap_err();
-
- let opts = GetOptions {
- range: Some(GetRange::Bounded(2..100)),
- ..Default::default()
- };
- let result = storage.get_opts(&location, opts).await.unwrap();
- assert_eq!(result.range, 2..14);
- assert_eq!(result.meta.size, 14);
- let bytes = result.bytes().await.unwrap();
- assert_eq!(bytes, b"bitrary data".as_ref());
-
- let opts = GetOptions {
- range: Some(GetRange::Suffix(2)),
- ..Default::default()
- };
- match storage.get_opts(&location, opts).await {
- Ok(result) => {
- assert_eq!(result.range, 12..14);
- assert_eq!(result.meta.size, 14);
- let bytes = result.bytes().await.unwrap();
- assert_eq!(bytes, b"ta".as_ref());
- }
- Err(Error::NotSupported { .. }) => {}
- Err(e) => panic!("{e}"),
- }
-
- let opts = GetOptions {
- range: Some(GetRange::Suffix(100)),
- ..Default::default()
- };
- match storage.get_opts(&location, opts).await {
- Ok(result) => {
- assert_eq!(result.range, 0..14);
- assert_eq!(result.meta.size, 14);
- let bytes = result.bytes().await.unwrap();
- assert_eq!(bytes, b"arbitrary data".as_ref());
- }
- Err(Error::NotSupported { .. }) => {}
- Err(e) => panic!("{e}"),
- }
-
- let opts = GetOptions {
- range: Some(GetRange::Offset(3)),
- ..Default::default()
- };
- let result = storage.get_opts(&location, opts).await.unwrap();
- assert_eq!(result.range, 3..14);
- assert_eq!(result.meta.size, 14);
- let bytes = result.bytes().await.unwrap();
- assert_eq!(bytes, b"itrary data".as_ref());
-
- let opts = GetOptions {
- range: Some(GetRange::Offset(100)),
- ..Default::default()
- };
- storage.get_opts(&location, opts).await.unwrap_err();
-
- let ranges = vec![0..1, 2..3, 0..5];
- let bytes = storage.get_ranges(&location, &ranges).await.unwrap();
- for (range, bytes) in ranges.iter().zip(bytes) {
- assert_eq!(bytes, data.slice(range.clone()))
- }
-
- let head = storage.head(&location).await.unwrap();
- assert_eq!(head.size, data.len());
-
- storage.delete(&location).await.unwrap();
-
- let content_list = flatten_list_stream(storage, None).await.unwrap();
- assert!(content_list.is_empty());
-
- let err = storage.get(&location).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
- let err = storage.head(&location).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
- // Test handling of paths containing an encoded delimiter
-
- let file_with_delimiter = Path::from_iter(["a", "b/c", "foo.file"]);
- storage
- .put(&file_with_delimiter, "arbitrary".into())
- .await
- .unwrap();
-
- let files = flatten_list_stream(storage, None).await.unwrap();
- assert_eq!(files, vec![file_with_delimiter.clone()]);
-
- let files = flatten_list_stream(storage, Some(&Path::from("a/b")))
- .await
- .unwrap();
- assert!(files.is_empty());
-
- let files = storage
- .list_with_delimiter(Some(&Path::from("a/b")))
- .await
- .unwrap();
- assert!(files.common_prefixes.is_empty());
- assert!(files.objects.is_empty());
-
- let files = storage
- .list_with_delimiter(Some(&Path::from("a")))
- .await
- .unwrap();
- assert_eq!(files.common_prefixes, vec![Path::from_iter(["a", "b/c"])]);
- assert!(files.objects.is_empty());
-
- let files = storage
- .list_with_delimiter(Some(&Path::from_iter(["a", "b/c"])))
- .await
- .unwrap();
- assert!(files.common_prefixes.is_empty());
- assert_eq!(files.objects.len(), 1);
- assert_eq!(files.objects[0].location, file_with_delimiter);
-
- storage.delete(&file_with_delimiter).await.unwrap();
-
- // Test handling of paths containing non-ASCII characters, e.g. emoji
-
- let emoji_prefix = Path::from("🙀");
- let emoji_file = Path::from("🙀/😀.parquet");
- storage.put(&emoji_file, "arbitrary".into()).await.unwrap();
-
- storage.head(&emoji_file).await.unwrap();
- storage
- .get(&emoji_file)
- .await
- .unwrap()
- .bytes()
- .await
- .unwrap();
-
- let files = flatten_list_stream(storage, Some(&emoji_prefix))
- .await
- .unwrap();
-
- assert_eq!(files, vec![emoji_file.clone()]);
-
- let dst = Path::from("foo.parquet");
- storage.copy(&emoji_file, &dst).await.unwrap();
- let mut files = flatten_list_stream(storage, None).await.unwrap();
- files.sort_unstable();
- assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);
-
- let dst2 = Path::from("new/nested/foo.parquet");
- storage.copy(&emoji_file, &dst2).await.unwrap();
- let mut files = flatten_list_stream(storage, None).await.unwrap();
- files.sort_unstable();
- assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);
-
- let dst3 = Path::from("new/nested2/bar.parquet");
- storage.rename(&dst, &dst3).await.unwrap();
- let mut files = flatten_list_stream(storage, None).await.unwrap();
- files.sort_unstable();
- assert_eq!(files, vec![emoji_file.clone(), dst2.clone(),
dst3.clone()]);
-
- let err = storage.head(&dst).await.unwrap_err();
- assert!(matches!(err, Error::NotFound { .. }));
-
- storage.delete(&emoji_file).await.unwrap();
- storage.delete(&dst3).await.unwrap();
- storage.delete(&dst2).await.unwrap();
- let files = flatten_list_stream(storage, Some(&emoji_prefix))
- .await
- .unwrap();
- assert!(files.is_empty());
-
- // Test handling of paths containing percent-encoded sequences
-
- // "HELLO" percent encoded
- let hello_prefix = Path::parse("%48%45%4C%4C%4F").unwrap();
- let path = hello_prefix.child("foo.parquet");
-
- storage.put(&path, vec![0, 1].into()).await.unwrap();
- let files = flatten_list_stream(storage, Some(&hello_prefix))
- .await
- .unwrap();
- assert_eq!(files, vec![path.clone()]);
-
- // Cannot list by decoded representation
- let files = flatten_list_stream(storage, Some(&Path::from("HELLO")))
- .await
- .unwrap();
- assert!(files.is_empty());
-
- // Cannot access by decoded representation
- let err = storage
- .head(&Path::from("HELLO/foo.parquet"))
- .await
- .unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
- storage.delete(&path).await.unwrap();
-
- // Test handling of unicode paths
- let path = Path::parse("🇦🇺/$shenanigans@@~.txt").unwrap();
- storage.put(&path, "test".into()).await.unwrap();
-
- let r = storage.get(&path).await.unwrap();
- assert_eq!(r.bytes().await.unwrap(), "test");
-
- let dir = Path::parse("🇦🇺").unwrap();
- let r = storage.list_with_delimiter(None).await.unwrap();
- assert!(r.common_prefixes.contains(&dir));
-
- let r = storage.list_with_delimiter(Some(&dir)).await.unwrap();
- assert_eq!(r.objects.len(), 1);
- assert_eq!(r.objects[0].location, path);
-
- storage.delete(&path).await.unwrap();
-
- // Can also write non-percent encoded sequences
- let path = Path::parse("%Q.parquet").unwrap();
- storage.put(&path, vec![0, 1].into()).await.unwrap();
-
- let files = flatten_list_stream(storage, None).await.unwrap();
- assert_eq!(files, vec![path.clone()]);
-
- storage.delete(&path).await.unwrap();
-
- let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
- storage.put(&path, vec![0, 1].into()).await.unwrap();
- storage.head(&path).await.unwrap();
-
- let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
- .await
- .unwrap();
- assert_eq!(files, vec![path.clone()]);
-
- storage.delete(&path).await.unwrap();
-
- let files = flatten_list_stream(storage, None).await.unwrap();
- assert!(files.is_empty(), "{files:?}");
-
- // Test list order
- let files = vec![
- Path::from("a a/b.file"),
- Path::parse("a%2Fa.file").unwrap(),
- Path::from("a/😀.file"),
- Path::from("a/a file"),
- Path::parse("a/a%2F.file").unwrap(),
- Path::from("a/a.file"),
- Path::from("a/a/b.file"),
- Path::from("a/b.file"),
- Path::from("aa/a.file"),
- Path::from("ab/a.file"),
- ];
-
- for file in &files {
- storage.put(file, "foo".into()).await.unwrap();
- }
-
- let cases = [
- (None, Path::from("a")),
- (None, Path::from("a/a file")),
- (None, Path::from("a/a/b.file")),
- (None, Path::from("ab/a.file")),
- (None, Path::from("a%2Fa.file")),
- (None, Path::from("a/😀.file")),
- (Some(Path::from("a")), Path::from("")),
- (Some(Path::from("a")), Path::from("a")),
- (Some(Path::from("a")), Path::from("a/😀")),
- (Some(Path::from("a")), Path::from("a/😀.file")),
- (Some(Path::from("a")), Path::from("a/b")),
- (Some(Path::from("a")), Path::from("a/a/b.file")),
- ];
-
- for (prefix, offset) in cases {
- let s = storage.list_with_offset(prefix.as_ref(), &offset);
- let mut actual: Vec<_> = s.map_ok(|x|
x.location).try_collect().await.unwrap();
-
- actual.sort_unstable();
-
- let expected: Vec<_> = files
- .iter()
- .filter(|x| {
- let prefix_match = prefix.as_ref().map(|p|
x.prefix_matches(p)).unwrap_or(true);
- prefix_match && *x > &offset
- })
- .cloned()
- .collect();
-
- assert_eq!(actual, expected, "{prefix:?} - {offset:?}");
- }
-
- // Test bulk delete
- let paths = vec![
- Path::from("a/a.file"),
- Path::from("a/a/b.file"),
- Path::from("aa/a.file"),
- Path::from("does_not_exist"),
- Path::from("I'm a < & weird path"),
- Path::from("ab/a.file"),
- Path::from("a/😀.file"),
- ];
-
- storage.put(&paths[4], "foo".into()).await.unwrap();
-
- let out_paths = storage
-
.delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
- .collect::<Vec<_>>()
- .await;
-
- assert_eq!(out_paths.len(), paths.len());
-
- let expect_errors = [3];
-
- for (i, input_path) in paths.iter().enumerate() {
- let err = storage.head(input_path).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
-
- if expect_errors.contains(&i) {
- // Some object stores will report NotFound, but others (such
as S3) will
- // report success regardless.
- match &out_paths[i] {
- Err(Error::NotFound { path: out_path, .. }) => {
- assert!(out_path.ends_with(&input_path.to_string()));
- }
- Ok(out_path) => {
- assert_eq!(out_path, input_path);
- }
- _ => panic!("unexpected error"),
- }
- } else {
- assert_eq!(out_paths[i].as_ref().unwrap(), input_path);
- }
- }
-
- delete_fixtures(storage).await;
-
- let path = Path::from("empty");
- storage.put(&path, PutPayload::default()).await.unwrap();
- let meta = storage.head(&path).await.unwrap();
- assert_eq!(meta.size, 0);
- let data = storage.get(&path).await.unwrap().bytes().await.unwrap();
- assert_eq!(data.len(), 0);
-
- storage.delete(&path).await.unwrap();
- }
-
- pub(crate) async fn put_get_attributes(integration: &dyn ObjectStore) {
- // Test handling of attributes
- let attributes = Attributes::from_iter([
- (Attribute::CacheControl, "max-age=604800"),
- (
- Attribute::ContentDisposition,
- r#"attachment; filename="test.html""#,
- ),
- (Attribute::ContentEncoding, "gzip"),
- (Attribute::ContentLanguage, "en-US"),
- (Attribute::ContentType, "text/html; charset=utf-8"),
- ]);
-
- let path = Path::from("attributes");
- let opts = attributes.clone().into();
- match integration.put_opts(&path, "foo".into(), opts).await {
- Ok(_) => {
- let r = integration.get(&path).await.unwrap();
- assert_eq!(r.attributes, attributes);
- }
- Err(Error::NotImplemented) => {}
- Err(e) => panic!("{e}"),
- }
-
- let opts = attributes.clone().into();
- match integration.put_multipart_opts(&path, opts).await {
- Ok(mut w) => {
- w.put_part("foo".into()).await.unwrap();
- w.complete().await.unwrap();
-
- let r = integration.get(&path).await.unwrap();
- assert_eq!(r.attributes, attributes);
- }
- Err(Error::NotImplemented) => {}
- Err(e) => panic!("{e}"),
- }
- }
-
- pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
- let path = Path::from("test");
- storage.put(&path, "foo".into()).await.unwrap();
- let meta = storage.head(&path).await.unwrap();
-
- let options = GetOptions {
- if_unmodified_since: Some(meta.last_modified),
- ..GetOptions::default()
- };
- match storage.get_opts(&path, options).await {
- Ok(_) | Err(Error::NotSupported { .. }) => {}
- Err(e) => panic!("{e}"),
- }
-
- let options = GetOptions {
- if_unmodified_since: Some(
- meta.last_modified + chrono::Duration::try_hours(10).unwrap(),
- ),
- ..GetOptions::default()
- };
- match storage.get_opts(&path, options).await {
- Ok(_) | Err(Error::NotSupported { .. }) => {}
- Err(e) => panic!("{e}"),
- }
-
- let options = GetOptions {
- if_unmodified_since: Some(
- meta.last_modified - chrono::Duration::try_hours(10).unwrap(),
- ),
- ..GetOptions::default()
- };
- match storage.get_opts(&path, options).await {
- Err(Error::Precondition { .. } | Error::NotSupported { .. }) => {}
- d => panic!("{d:?}"),
- }
-
- let options = GetOptions {
- if_modified_since: Some(meta.last_modified),
- ..GetOptions::default()
- };
- match storage.get_opts(&path, options).await {
- Err(Error::NotModified { .. } | Error::NotSupported { .. }) => {}
- d => panic!("{d:?}"),
- }
-
- let options = GetOptions {
- if_modified_since: Some(meta.last_modified -
chrono::Duration::try_hours(10).unwrap()),
- ..GetOptions::default()
- };
- match storage.get_opts(&path, options).await {
- Ok(_) | Err(Error::NotSupported { .. }) => {}
- Err(e) => panic!("{e}"),
- }
-
- let tag = meta.e_tag.unwrap();
- let options = GetOptions {
- if_match: Some(tag.clone()),
- ..GetOptions::default()
- };
- storage.get_opts(&path, options).await.unwrap();
-
- let options = GetOptions {
- if_match: Some("invalid".to_string()),
- ..GetOptions::default()
- };
- let err = storage.get_opts(&path, options).await.unwrap_err();
- assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
- let options = GetOptions {
- if_none_match: Some(tag.clone()),
- ..GetOptions::default()
- };
- let err = storage.get_opts(&path, options).await.unwrap_err();
- assert!(matches!(err, Error::NotModified { .. }), "{err}");
-
- let options = GetOptions {
- if_none_match: Some("invalid".to_string()),
- ..GetOptions::default()
- };
- storage.get_opts(&path, options).await.unwrap();
-
- let result = storage.put(&path, "test".into()).await.unwrap();
- let new_tag = result.e_tag.unwrap();
- assert_ne!(tag, new_tag);
-
- let meta = storage.head(&path).await.unwrap();
- assert_eq!(meta.e_tag.unwrap(), new_tag);
-
- let options = GetOptions {
- if_match: Some(new_tag),
- ..GetOptions::default()
- };
- storage.get_opts(&path, options).await.unwrap();
-
- let options = GetOptions {
- if_match: Some(tag),
- ..GetOptions::default()
- };
- let err = storage.get_opts(&path, options).await.unwrap_err();
- assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
- if let Some(version) = meta.version {
- storage.put(&path, "bar".into()).await.unwrap();
-
- let options = GetOptions {
- version: Some(version),
- ..GetOptions::default()
- };
-
- // Can retrieve previous version
- let get_opts = storage.get_opts(&path, options).await.unwrap();
- let old = get_opts.bytes().await.unwrap();
- assert_eq!(old, b"test".as_slice());
-
- // Current version contains the updated data
- let current =
storage.get(&path).await.unwrap().bytes().await.unwrap();
- assert_eq!(¤t, b"bar".as_slice());
- }
- }
-
- pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update:
bool) {
- // When using DynamoCommit repeated runs of this test will produce the
same sequence of records in DynamoDB
- // As a result each conditional operation will need to wait for the
lease to timeout before proceeding
- // One solution would be to clear DynamoDB before each test, but this
would require non-trivial additional code
- // so we instead just generate a random suffix for the filenames
- let rng = thread_rng();
- let suffix =
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
-
- delete_fixtures(storage).await;
- let path = Path::from(format!("put_opts_{suffix}"));
- let v1 = storage
- .put_opts(&path, "a".into(), PutMode::Create.into())
- .await
- .unwrap();
-
- let err = storage
- .put_opts(&path, "b".into(), PutMode::Create.into())
- .await
- .unwrap_err();
- assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
-
- let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
- assert_eq!(b.as_ref(), b"a");
-
- if !supports_update {
- return;
- }
-
- let v2 = storage
- .put_opts(&path, "c".into(),
PutMode::Update(v1.clone().into()).into())
- .await
- .unwrap();
-
- let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
- assert_eq!(b.as_ref(), b"c");
-
- let err = storage
- .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
- .await
- .unwrap_err();
- assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
- storage
- .put_opts(&path, "e".into(),
PutMode::Update(v2.clone().into()).into())
- .await
- .unwrap();
-
- let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
- assert_eq!(b.as_ref(), b"e");
-
- // Update not exists
- let path = Path::from("I don't exist");
- let err = storage
- .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
- .await
- .unwrap_err();
- assert!(matches!(err, Error::Precondition { .. }), "{err}");
-
- const NUM_WORKERS: usize = 5;
- const NUM_INCREMENTS: usize = 10;
-
- let path = Path::from(format!("RACE-{suffix}"));
- let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
- .map(|_| async {
- for _ in 0..NUM_INCREMENTS {
- loop {
- match storage.get(&path).await {
- Ok(r) => {
- let mode = PutMode::Update(UpdateVersion {
- e_tag: r.meta.e_tag.clone(),
- version: r.meta.version.clone(),
- });
-
- let b = r.bytes().await.unwrap();
- let v: usize =
std::str::from_utf8(&b).unwrap().parse().unwrap();
- let new = (v + 1).to_string();
-
- match storage.put_opts(&path, new.into(),
mode.into()).await {
- Ok(_) => break,
- Err(Error::Precondition { .. }) =>
continue,
- Err(e) => return Err(e),
- }
- }
- Err(Error::NotFound { .. }) => {
- let mode = PutMode::Create;
- match storage.put_opts(&path, "1".into(),
mode.into()).await {
- Ok(_) => break,
- Err(Error::AlreadyExists { .. }) =>
continue,
- Err(e) => return Err(e),
- }
- }
- Err(e) => return Err(e),
- }
- }
- }
- Ok(())
- })
- .collect();
-
- while futures.next().await.transpose().unwrap().is_some() {}
- let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
- let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
- assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);
- }
-
- /// Returns a chunk of length `chunk_length`
- fn get_chunk(chunk_length: usize) -> Bytes {
- let mut data = vec![0_u8; chunk_length];
- let mut rng = thread_rng();
- // Set a random selection of bytes
- for _ in 0..1000 {
- data[rng.gen_range(0..chunk_length)] = rng.gen();
- }
- data.into()
- }
-
- /// Returns `num_chunks` of length `chunks`
- fn get_chunks(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
- (0..num_chunks).map(|_| get_chunk(chunk_length)).collect()
- }
-
- pub(crate) async fn stream_get(storage: &DynObjectStore) {
- let location = Path::from("test_dir/test_upload_file.txt");
-
- // Can write to storage
- let data = get_chunks(5 * 1024 * 1024, 3);
- let bytes_expected = data.concat();
- let mut upload = storage.put_multipart(&location).await.unwrap();
- let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
- futures::future::try_join_all(uploads).await.unwrap();
-
- // Object should not yet exist in store
- let meta_res = storage.head(&location).await;
- assert!(meta_res.is_err());
- assert!(matches!(
- meta_res.unwrap_err(),
- crate::Error::NotFound { .. }
- ));
-
- let files = flatten_list_stream(storage, None).await.unwrap();
- assert_eq!(&files, &[]);
-
- let result = storage.list_with_delimiter(None).await.unwrap();
- assert_eq!(&result.objects, &[]);
-
- upload.complete().await.unwrap();
-
- let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
- assert_eq!(bytes_expected, bytes_written);
-
- // Can overwrite some storage
- // Sizes chosen to ensure we write three parts
- let data = get_chunks(3_200_000, 7);
- let bytes_expected = data.concat();
- let upload = storage.put_multipart(&location).await.unwrap();
- let mut writer = WriteMultipart::new(upload);
- for chunk in &data {
- writer.write(chunk)
- }
- writer.finish().await.unwrap();
- let bytes_written =
storage.get(&location).await.unwrap().bytes().await.unwrap();
- assert_eq!(bytes_expected, bytes_written);
-
- // We can abort an empty write
- let location = Path::from("test_dir/test_abort_upload.txt");
- let mut upload = storage.put_multipart(&location).await.unwrap();
- upload.abort().await.unwrap();
- let get_res = storage.get(&location).await;
- assert!(get_res.is_err());
- assert!(matches!(
- get_res.unwrap_err(),
- crate::Error::NotFound { .. }
- ));
-
- // We can abort an in-progress write
- let mut upload = storage.put_multipart(&location).await.unwrap();
- upload
- .put_part(data.first().unwrap().clone().into())
- .await
- .unwrap();
-
- upload.abort().await.unwrap();
- let get_res = storage.get(&location).await;
- assert!(get_res.is_err());
- assert!(matches!(
- get_res.unwrap_err(),
- crate::Error::NotFound { .. }
- ));
- }
-
- pub(crate) async fn list_uses_directories_correctly(storage:
&DynObjectStore) {
- delete_fixtures(storage).await;
-
- let content_list = flatten_list_stream(storage, None).await.unwrap();
- assert!(
- content_list.is_empty(),
- "Expected list to be empty; found: {content_list:?}"
- );
-
- let location1 = Path::from("foo/x.json");
- let location2 = Path::from("foo.bar/y.json");
-
- let data = PutPayload::from("arbitrary data");
- storage.put(&location1, data.clone()).await.unwrap();
- storage.put(&location2, data).await.unwrap();
-
- let prefix = Path::from("foo");
- let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
- assert_eq!(content_list, &[location1.clone()]);
-
- let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
- assert_eq!(result.objects.len(), 1);
- assert_eq!(result.objects[0].location, location1);
- assert_eq!(result.common_prefixes, &[]);
-
- // Listing an existing path (file) should return an empty list:
- // https://github.com/apache/arrow-rs/issues/3712
- let content_list = flatten_list_stream(storage, Some(&location1))
- .await
- .unwrap();
- assert_eq!(content_list, &[]);
-
- let list =
storage.list_with_delimiter(Some(&location1)).await.unwrap();
- assert_eq!(list.objects, &[]);
- assert_eq!(list.common_prefixes, &[]);
-
- let prefix = Path::from("foo/x");
- let content_list = flatten_list_stream(storage,
Some(&prefix)).await.unwrap();
- assert_eq!(content_list, &[]);
-
- let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
- assert_eq!(list.objects, &[]);
- assert_eq!(list.common_prefixes, &[]);
- }
-
- pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
- delete_fixtures(storage).await;
-
- // ==================== check: store is empty ====================
- let content_list = flatten_list_stream(storage, None).await.unwrap();
- assert!(content_list.is_empty());
-
- // ==================== do: create files ====================
- let data = Bytes::from("arbitrary data");
-
- let files: Vec<_> = [
- "test_file",
- "mydb/wb/000/000/000.segment",
- "mydb/wb/000/000/001.segment",
- "mydb/wb/000/000/002.segment",
- "mydb/wb/001/001/000.segment",
- "mydb/wb/foo.json",
- "mydb/wbwbwb/111/222/333.segment",
- "mydb/data/whatevs",
- ]
- .iter()
- .map(|&s| Path::from(s))
- .collect();
-
- for f in &files {
- storage.put(f, data.clone().into()).await.unwrap();
- }
-
- // ==================== check: prefix-list `mydb/wb` (directory)
====================
- let prefix = Path::from("mydb/wb");
-
- let expected_000 = Path::from("mydb/wb/000");
- let expected_001 = Path::from("mydb/wb/001");
- let expected_location = Path::from("mydb/wb/foo.json");
-
- let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
-
- assert_eq!(result.common_prefixes, vec![expected_000, expected_001]);
- assert_eq!(result.objects.len(), 1);
-
- let object = &result.objects[0];
-
- assert_eq!(object.location, expected_location);
- assert_eq!(object.size, data.len());
-
- // ==================== check: prefix-list `mydb/wb/000/000/001`
(partial filename doesn't match) ====================
- let prefix = Path::from("mydb/wb/000/000/001");
-
- let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
- assert!(result.common_prefixes.is_empty());
- assert_eq!(result.objects.len(), 0);
-
- // ==================== check: prefix-list `not_there` (non-existing
prefix) ====================
- let prefix = Path::from("not_there");
-
- let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
- assert!(result.common_prefixes.is_empty());
- assert!(result.objects.is_empty());
-
- // ==================== do: remove all files ====================
- for f in &files {
- storage.delete(f).await.unwrap();
- }
-
- // ==================== check: store is empty ====================
- let content_list = flatten_list_stream(storage, None).await.unwrap();
- assert!(content_list.is_empty());
- }
-
- pub(crate) async fn get_nonexistent_object(
- storage: &DynObjectStore,
- location: Option<Path>,
- ) -> crate::Result<Bytes> {
- let location = location.unwrap_or_else(||
Path::from("this_file_should_not_exist"));
-
- let err = storage.head(&location).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }));
-
- storage.get(&location).await?.bytes().await
- }
-
- pub(crate) async fn rename_and_copy(storage: &DynObjectStore) {
- // Create two objects
- let path1 = Path::from("test1");
- let path2 = Path::from("test2");
- let contents1 = Bytes::from("cats");
- let contents2 = Bytes::from("dogs");
-
- // copy() make both objects identical
- storage.put(&path1, contents1.clone().into()).await.unwrap();
- storage.put(&path2, contents2.clone().into()).await.unwrap();
- storage.copy(&path1, &path2).await.unwrap();
- let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
- assert_eq!(&new_contents, &contents1);
-
- // rename() copies contents and deletes original
- storage.put(&path1, contents1.clone().into()).await.unwrap();
- storage.put(&path2, contents2.clone().into()).await.unwrap();
- storage.rename(&path1, &path2).await.unwrap();
- let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
- assert_eq!(&new_contents, &contents1);
- let result = storage.get(&path1).await;
- assert!(result.is_err());
- assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
- // Clean up
- storage.delete(&path2).await.unwrap();
- }
-
- pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
- // Create two objects
- let path1 = Path::from("test1");
- let path2 = Path::from("not_exists_nested/test2");
- let contents1 = Bytes::from("cats");
- let contents2 = Bytes::from("dogs");
-
- // copy_if_not_exists() errors if destination already exists
- storage.put(&path1, contents1.clone().into()).await.unwrap();
- storage.put(&path2, contents2.clone().into()).await.unwrap();
- let result = storage.copy_if_not_exists(&path1, &path2).await;
- assert!(result.is_err());
- assert!(matches!(
- result.unwrap_err(),
- crate::Error::AlreadyExists { .. }
- ));
-
- // copy_if_not_exists() copies contents and allows deleting original
- storage.delete(&path2).await.unwrap();
- storage.copy_if_not_exists(&path1, &path2).await.unwrap();
- storage.delete(&path1).await.unwrap();
- let new_contents =
storage.get(&path2).await.unwrap().bytes().await.unwrap();
- assert_eq!(&new_contents, &contents1);
- let result = storage.get(&path1).await;
- assert!(result.is_err());
- assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
- // Clean up
- storage.delete(&path2).await.unwrap();
- }
-
- pub(crate) async fn copy_rename_nonexistent_object(storage:
&DynObjectStore) {
- // Create empty source object
- let path1 = Path::from("test1");
-
- // Create destination object
- let path2 = Path::from("test2");
- storage.put(&path2, "hello".into()).await.unwrap();
-
- // copy() errors if source does not exist
- let result = storage.copy(&path1, &path2).await;
- assert!(result.is_err());
- assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
- // rename() errors if source does not exist
- let result = storage.rename(&path1, &path2).await;
- assert!(result.is_err());
- assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
- // copy_if_not_exists() errors if source does not exist
- let result = storage.copy_if_not_exists(&path1, &path2).await;
- assert!(result.is_err());
- assert!(matches!(result.unwrap_err(), crate::Error::NotFound { .. }));
-
- // Clean up
- storage.delete(&path2).await.unwrap();
- }
-
- pub(crate) async fn multipart(storage: &dyn ObjectStore, multipart: &dyn
MultipartStore) {
- let path = Path::from("test_multipart");
- let chunk_size = 5 * 1024 * 1024;
-
- let chunks = get_chunks(chunk_size, 2);
-
- let id = multipart.create_multipart(&path).await.unwrap();
-
- let parts: Vec<_> = futures::stream::iter(chunks)
- .enumerate()
- .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
- .buffered(2)
- .try_collect()
- .await
- .unwrap();
-
- multipart
- .complete_multipart(&path, &id, parts)
- .await
- .unwrap();
-
- let meta = storage.head(&path).await.unwrap();
- assert_eq!(meta.size, chunk_size * 2);
-
- // Empty case
- let path = Path::from("test_empty_multipart");
-
- let id = multipart.create_multipart(&path).await.unwrap();
-
- let parts = vec![];
-
- multipart
- .complete_multipart(&path, &id, parts)
- .await
- .unwrap();
-
- let meta = storage.head(&path).await.unwrap();
- assert_eq!(meta.size, 0);
+ /// Test that the returned stream does not borrow the lifetime of Path
+ fn list_store<'a>(
+ store: &'a dyn ObjectStore,
+ path_str: &str,
+ ) -> BoxStream<'a, Result<ObjectMeta>> {
+ let path = Path::from(path_str);
+ store.list(Some(&path))
}
#[cfg(any(feature = "azure", feature = "aws"))]
- pub(crate) async fn signing<T>(integration: &T)
+ pub async fn signing<T>(integration: &T)
where
- T: ObjectStore + crate::signer::Signer,
+ T: ObjectStore + signer::Signer,
{
use reqwest::Method;
use std::time::Duration;
@@ -2367,7 +1337,7 @@ mod tests {
}
#[cfg(any(feature = "aws", feature = "azure"))]
- pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>,
validate: bool, get_tags: F)
+ pub async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate:
bool, get_tags: F)
where
F: Fn(Path) -> Fut + Send + Sync,
Fut: std::future::Future<Output = Result<reqwest::Response>> + Send,
@@ -2444,24 +1414,6 @@ mod tests {
}
}
- async fn delete_fixtures(storage: &DynObjectStore) {
- let paths = storage.list(None).map_ok(|meta| meta.location).boxed();
- storage
- .delete_stream(paths)
- .try_collect::<Vec<_>>()
- .await
- .unwrap();
- }
-
- /// Test that the returned stream does not borrow the lifetime of Path
- fn list_store<'a>(
- store: &'a dyn ObjectStore,
- path_str: &str,
- ) -> BoxStream<'a, Result<ObjectMeta>> {
- let path = Path::from(path_str);
- store.list(Some(&path))
- }
-
#[tokio::test]
async fn test_list_lifetimes() {
let store = memory::InMemory::new();
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index f3e1d4296fe..64b96ad1a96 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -273,9 +273,9 @@ impl MultipartUpload for LimitUpload {
#[cfg(test)]
mod tests {
+ use crate::integration::*;
use crate::limit::LimitStore;
use crate::memory::InMemory;
- use crate::tests::*;
use crate::ObjectStore;
use futures::stream::StreamExt;
use std::pin::Pin;
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 8dec5bee0a2..95b50d6743f 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -1005,8 +1005,7 @@ mod tests {
use futures::TryStreamExt;
use tempfile::{NamedTempFile, TempDir};
- use crate::test_util::flatten_list_stream;
- use crate::tests::*;
+ use crate::integration::*;
use super::*;
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index daf14e17510..0d72983b049 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -528,7 +528,7 @@ impl MultipartUpload for InMemoryUpload {
#[cfg(test)]
mod tests {
- use crate::tests::*;
+ use crate::integration::*;
use super::*;
diff --git a/object_store/src/parse.rs b/object_store/src/parse.rs
index 5549fd3a3e5..e5d5149fb7a 100644
--- a/object_store/src/parse.rs
+++ b/object_store/src/parse.rs
@@ -25,15 +25,9 @@ use url::Url;
#[derive(Debug, Snafu)]
enum Error {
- #[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
- InvalidUrl { url: Url },
-
#[snafu(display("Unable to recognise URL \"{}\"", url))]
Unrecognised { url: Url },
- #[snafu(display("Feature {scheme:?} not enabled"))]
- NotEnabled { scheme: ObjectStoreScheme },
-
#[snafu(context(false))]
Path { source: crate::path::Error },
}
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 7c9ea5804c3..9b10fea5e0b 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -204,9 +204,8 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::integration::*;
use crate::local::LocalFileSystem;
- use crate::test_util::flatten_list_stream;
- use crate::tests::*;
use tempfile::TempDir;
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index 38b6d7c3bf4..d07276c3dca 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -398,7 +398,7 @@ impl MultipartUpload for ThrottledUpload {
#[cfg(test)]
mod tests {
use super::*;
- use crate::{memory::InMemory, tests::*, GetResultPayload};
+ use crate::{integration::*, memory::InMemory, GetResultPayload};
use futures::TryStreamExt;
use tokio::time::Duration;
use tokio::time::Instant;