This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 62ca5f37d1 Split aws Module (#4953)
62ca5f37d1 is described below
commit 62ca5f37d143db172a73b3f0365f48f8bc3e2c72
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Oct 19 13:40:49 2023 +0100
Split aws Module (#4953)
* Split aws module
* Clippy
* Fix doc
---
object_store/src/aws/{mod.rs => builder.rs} | 527 ++----------
object_store/src/aws/mod.rs | 1169 +--------------------------
object_store/src/aws/resolve.rs | 106 +++
3 files changed, 172 insertions(+), 1630 deletions(-)
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/builder.rs
similarity index 73%
copy from object_store/src/aws/mod.rs
copy to object_store/src/aws/builder.rs
index 6d5aecea2d..422ba15efa 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/builder.rs
@@ -15,80 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-//! An object store implementation for S3
-//!
-//! ## Multi-part uploads
-//!
-//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart]
method.
-//! Data passed to the writer is automatically buffered to meet the minimum
size
-//! requirements for a part. Multiple parts are uploaded concurrently.
-//!
-//! If the writer fails for any reason, you may have parts uploaded to AWS but
not
-//! used that you may be charged for. Use the [ObjectStore::abort_multipart]
method
-//! to abort the upload and drop those unneeded parts. In addition, you may
wish to
-//! consider implementing [automatic cleanup] of unused parts that are older
than one
-//! week.
-//!
-//! [automatic cleanup]:
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
-use itertools::Itertools;
-use reqwest::Method;
-use serde::{Deserialize, Serialize};
-use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::{str::FromStr, sync::Arc, time::Duration};
-use tokio::io::AsyncWrite;
-use tracing::info;
-use url::Url;
-
use crate::aws::client::{S3Client, S3Config};
use crate::aws::credential::{
InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider,
};
-use crate::client::get::GetClientExt;
-use crate::client::list::ListClientExt;
-use crate::client::{
- ClientConfigKey, CredentialProvider, StaticCredentialProvider,
- TokenCredentialProvider,
+use crate::aws::{
+ AmazonS3, AwsCredential, AwsCredentialProvider, Checksum,
S3CopyIfNotExists, STORE,
};
+use crate::client::TokenCredentialProvider;
use crate::config::ConfigValue;
-use crate::multipart::{PartId, PutPart, WriteMultiPart};
-use crate::signer::Signer;
use crate::{
- ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, Path, PutResult, Result, RetryConfig,
+ ClientConfigKey, ClientOptions, Result, RetryConfig,
StaticCredentialProvider,
};
-
-mod checksum;
-mod client;
-mod copy;
-mod credential;
-
-pub use checksum::Checksum;
-pub use copy::S3CopyIfNotExists;
-
-//
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
-//
-// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
-// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~
).
-pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
- percent_encoding::NON_ALPHANUMERIC
- .remove(b'-')
- .remove(b'.')
- .remove(b'_')
- .remove(b'~');
-
-/// This struct is used to maintain the URI path encoding
-const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet =
STRICT_ENCODE_SET.remove(b'/');
-
-const STORE: &str = "S3";
-
-/// [`CredentialProvider`] for [`AmazonS3`]
-pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential =
AwsCredential>>;
-pub use credential::{AwsAuthorizer, AwsCredential};
+use itertools::Itertools;
+use serde::{Deserialize, Serialize};
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::str::FromStr;
+use std::sync::Arc;
+use tracing::info;
+use url::Url;
/// Default metadata endpoint
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";
@@ -140,7 +85,7 @@ enum Error {
RegionParse { bucket: String },
}
-impl From<Error> for super::Error {
+impl From<Error> for crate::Error {
fn from(source: Error) -> Self {
match source {
Error::UnknownConfigurationKey { key } => {
@@ -154,233 +99,6 @@ impl From<Error> for super::Error {
}
}
-/// Get the bucket region using the [HeadBucket API]. This will fail if the
bucket does not exist.
-///
-/// [HeadBucket API]:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
-pub async fn resolve_bucket_region(
- bucket: &str,
- client_options: &ClientOptions,
-) -> Result<String> {
- use reqwest::StatusCode;
-
- let endpoint = format!("https://{}.s3.amazonaws.com", bucket);
-
- let client = client_options.client()?;
-
- let response = client
- .head(&endpoint)
- .send()
- .await
- .context(ResolveRegionSnafu { bucket })?;
-
- ensure!(
- response.status() != StatusCode::NOT_FOUND,
- BucketNotFoundSnafu { bucket }
- );
-
- let region = response
- .headers()
- .get("x-amz-bucket-region")
- .and_then(|x| x.to_str().ok())
- .context(RegionParseSnafu { bucket })?;
-
- Ok(region.to_string())
-}
-
-/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
-#[derive(Debug)]
-pub struct AmazonS3 {
- client: Arc<S3Client>,
-}
-
-impl std::fmt::Display for AmazonS3 {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "AmazonS3({})", self.client.config().bucket)
- }
-}
-
-impl AmazonS3 {
- /// Returns the [`AwsCredentialProvider`] used by [`AmazonS3`]
- pub fn credentials(&self) -> &AwsCredentialProvider {
- &self.client.config().credentials
- }
-
- /// Create a full URL to the resource specified by `path` with this
instance's configuration.
- fn path_url(&self, path: &Path) -> String {
- self.client.config().path_url(path)
- }
-}
-
-#[async_trait]
-impl Signer for AmazonS3 {
- /// Create a URL containing the relevant [AWS SigV4] query parameters that
authorize a request
- /// via `method` to the resource at `path` valid for the duration
specified in `expires_in`.
- ///
- /// [AWS SigV4]:
https://docs.aws.amazon.com/IAM/latest/UserGuide/create-signed-request.html
- ///
- /// # Example
- ///
- /// This example returns a URL that will enable a user to upload a file to
- /// "some-folder/some-file.txt" in the next hour.
- ///
- /// ```
- /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
- /// # use object_store::{aws::AmazonS3Builder, path::Path, signer::Signer};
- /// # use reqwest::Method;
- /// # use std::time::Duration;
- /// #
- /// let region = "us-east-1";
- /// let s3 = AmazonS3Builder::new()
- /// .with_region(region)
- /// .with_bucket_name("my-bucket")
- /// .with_access_key_id("my-access-key-id")
- /// .with_secret_access_key("my-secret-access-key")
- /// .build()?;
- ///
- /// let url = s3.signed_url(
- /// Method::PUT,
- /// &Path::from("some-folder/some-file.txt"),
- /// Duration::from_secs(60 * 60)
- /// ).await?;
- /// # Ok(())
- /// # }
- /// ```
- async fn signed_url(
- &self,
- method: Method,
- path: &Path,
- expires_in: Duration,
- ) -> Result<Url> {
- let credential = self.credentials().get_credential().await?;
- let authorizer =
- AwsAuthorizer::new(&credential, "s3",
&self.client.config().region);
-
- let path_url = self.path_url(path);
- let mut url =
- Url::parse(&path_url).context(UnableToParseUrlSnafu { url:
path_url })?;
-
- authorizer.sign(method, &mut url, expires_in);
-
- Ok(url)
- }
-}
-
-#[async_trait]
-impl ObjectStore for AmazonS3 {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
- let e_tag = self.client.put_request(location, bytes, &()).await?;
- Ok(PutResult { e_tag: Some(e_tag) })
- }
-
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let id = self.client.create_multipart(location).await?;
-
- let upload = S3MultiPartUpload {
- location: location.clone(),
- upload_id: id.clone(),
- client: Arc::clone(&self.client),
- };
-
- Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
- }
-
- async fn abort_multipart(
- &self,
- location: &Path,
- multipart_id: &MultipartId,
- ) -> Result<()> {
- self.client
- .delete_request(location, &[("uploadId", multipart_id)])
- .await
- }
-
- async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- self.client.get_opts(location, options).await
- }
-
- async fn delete(&self, location: &Path) -> Result<()> {
- self.client.delete_request(location, &()).await
- }
-
- fn delete_stream<'a>(
- &'a self,
- locations: BoxStream<'a, Result<Path>>,
- ) -> BoxStream<'a, Result<Path>> {
- locations
- .try_chunks(1_000)
- .map(move |locations| async {
- // Early return the error. We ignore the paths that have
already been
- // collected into the chunk.
- let locations = locations.map_err(|e| e.1)?;
- self.client
- .bulk_delete_request(locations)
- .await
- .map(futures::stream::iter)
- })
- .buffered(20)
- .try_flatten()
- .boxed()
- }
-
- fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
- self.client.list(prefix)
- }
-
- fn list_with_offset(
- &self,
- prefix: Option<&Path>,
- offset: &Path,
- ) -> BoxStream<'_, Result<ObjectMeta>> {
- self.client.list_with_offset(prefix, offset)
- }
-
- async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
- self.client.list_with_delimiter(prefix).await
- }
-
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, true).await
- }
-
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, false).await
- }
-}
-
-struct S3MultiPartUpload {
- location: Path,
- upload_id: String,
- client: Arc<S3Client>,
-}
-
-#[async_trait]
-impl PutPart for S3MultiPartUpload {
- async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
- let part = (part_idx + 1).to_string();
-
- let content_id = self
- .client
- .put_request(
- &self.location,
- buf.into(),
- &[("partNumber", &part), ("uploadId", &self.upload_id)],
- )
- .await?;
-
- Ok(PartId { content_id })
- }
-
- async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
- self.client
- .complete_multipart(&self.location, &self.upload_id,
completed_parts)
- .await?;
- Ok(())
- }
-}
-
/// Configure a connection to Amazon S3 using the specified credentials in
/// the specified Amazon region and bucket.
///
@@ -567,7 +285,7 @@ pub enum AmazonS3ConfigKey {
///
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
ContainerCredentialsRelativeUri,
- /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
+ /// Configure how to provide `copy_if_not_exists`
///
/// See [`S3CopyIfNotExists`]
CopyIfNotExists,
@@ -605,7 +323,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
}
impl FromStr for AmazonS3ConfigKey {
- type Err = super::Error;
+ type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
@@ -1026,7 +744,7 @@ impl AmazonS3Builder {
self
}
- /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
+ /// Configure how to provide `copy_if_not_exists`
pub fn with_copy_if_not_exists(mut self, config: S3CopyIfNotExists) ->
Self {
self.copy_if_not_exists = Some(config.into());
self
@@ -1160,16 +878,8 @@ impl AmazonS3Builder {
#[cfg(test)]
mod tests {
use super::*;
- use crate::tests::{
- copy_if_not_exists, get_nonexistent_object, get_opts,
- list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list_opts,
- rename_and_copy, stream_get,
- };
- use bytes::Bytes;
use std::collections::HashMap;
- const NON_EXISTENT_NAME: &str = "nonexistentname";
-
#[test]
fn s3_test_config_from_map() {
let aws_access_key_id = "object_store:fake_access_key_id".to_string();
@@ -1258,130 +968,6 @@ mod tests {
);
}
- #[tokio::test]
- async fn s3_test() {
- crate::test_util::maybe_skip_integration!();
- let config = AmazonS3Builder::from_env();
-
- let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
- let test_not_exists = config.copy_if_not_exists.is_some();
- let integration = config.build().unwrap();
-
- // Localstack doesn't support listing with spaces
https://github.com/localstack/localstack/issues/6328
- put_get_delete_list_opts(&integration, is_local).await;
- get_opts(&integration).await;
- list_uses_directories_correctly(&integration).await;
- list_with_delimiter(&integration).await;
- rename_and_copy(&integration).await;
- stream_get(&integration).await;
- if test_not_exists {
- copy_if_not_exists(&integration).await;
- }
-
- // run integration test with unsigned payload enabled
- let config = AmazonS3Builder::from_env().with_unsigned_payload(true);
- let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
- let integration = config.build().unwrap();
- put_get_delete_list_opts(&integration, is_local).await;
-
- // run integration test with checksum set to sha256
- let config =
-
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
- let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
- let integration = config.build().unwrap();
- put_get_delete_list_opts(&integration, is_local).await;
- }
-
- #[tokio::test]
- async fn s3_test_get_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
- let integration = AmazonS3Builder::from_env().build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- let err = get_nonexistent_object(&integration, Some(location))
- .await
- .unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
- }
-
- #[tokio::test]
- async fn s3_test_get_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
- let config =
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
- let integration = config.build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- let err = integration.get(&location).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
- }
-
- #[tokio::test]
- async fn s3_test_put_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
- let config =
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
- let integration = config.build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
- let data = Bytes::from("arbitrary data");
-
- let err = integration.put(&location, data).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
- }
-
- #[tokio::test]
- async fn s3_test_delete_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
- let integration = AmazonS3Builder::from_env().build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- integration.delete(&location).await.unwrap();
- }
-
- #[tokio::test]
- async fn s3_test_delete_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
- let config =
AmazonS3Builder::from_env().with_bucket_name(NON_EXISTENT_NAME);
- let integration = config.build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- let err = integration.delete(&location).await.unwrap_err();
- assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
- }
-
- #[tokio::test]
- async fn s3_test_proxy_url() {
- let s3 = AmazonS3Builder::new()
- .with_access_key_id("access_key_id")
- .with_secret_access_key("secret_access_key")
- .with_region("region")
- .with_bucket_name("bucket_name")
- .with_allow_http(true)
- .with_proxy_url("https://example.com")
- .build();
-
- assert!(s3.is_ok());
-
- let err = AmazonS3Builder::new()
- .with_access_key_id("access_key_id")
- .with_secret_access_key("secret_access_key")
- .with_region("region")
- .with_bucket_name("bucket_name")
- .with_allow_http(true)
- .with_proxy_url("asdf://example.com")
- .build()
- .unwrap_err()
- .to_string();
-
- assert_eq!(
- "Generic HTTP client error: builder error: unknown proxy scheme",
- err
- );
- }
-
#[test]
fn s3_test_urls() {
let mut builder = AmazonS3Builder::new();
@@ -1451,6 +1037,36 @@ mod tests {
}
}
+ #[tokio::test]
+ async fn s3_test_proxy_url() {
+ let s3 = AmazonS3Builder::new()
+ .with_access_key_id("access_key_id")
+ .with_secret_access_key("secret_access_key")
+ .with_region("region")
+ .with_bucket_name("bucket_name")
+ .with_allow_http(true)
+ .with_proxy_url("https://example.com")
+ .build();
+
+ assert!(s3.is_ok());
+
+ let err = AmazonS3Builder::new()
+ .with_access_key_id("access_key_id")
+ .with_secret_access_key("secret_access_key")
+ .with_region("region")
+ .with_bucket_name("bucket_name")
+ .with_allow_http(true)
+ .with_proxy_url("asdf://example.com")
+ .build()
+ .unwrap_err()
+ .to_string();
+
+ assert_eq!(
+ "Generic HTTP client error: builder error: unknown proxy scheme",
+ err
+ );
+ }
+
#[test]
fn test_invalid_config() {
let err = AmazonS3Builder::new()
@@ -1480,56 +1096,3 @@ mod tests {
);
}
}
-
-#[cfg(test)]
-mod s3_resolve_bucket_region_tests {
- use super::*;
-
- #[tokio::test]
- async fn test_private_bucket() {
- let bucket = "bloxbender";
-
- let region = resolve_bucket_region(bucket, &ClientOptions::new())
- .await
- .unwrap();
-
- let expected = "us-west-2".to_string();
-
- assert_eq!(region, expected);
- }
-
- #[tokio::test]
- async fn test_bucket_does_not_exist() {
- let bucket = "please-dont-exist";
-
- let result = resolve_bucket_region(bucket,
&ClientOptions::new()).await;
-
- assert!(result.is_err());
- }
-
- #[tokio::test]
- #[ignore = "Tests shouldn't call use remote services by default"]
- async fn test_disable_creds() {
- // https://registry.opendata.aws/daylight-osm/
- let v1 = AmazonS3Builder::new()
- .with_bucket_name("daylight-map-distribution")
- .with_region("us-west-1")
- .with_access_key_id("local")
- .with_secret_access_key("development")
- .build()
- .unwrap();
-
- let prefix = Path::from("release");
-
- v1.list_with_delimiter(Some(&prefix)).await.unwrap_err();
-
- let v2 = AmazonS3Builder::new()
- .with_bucket_name("daylight-map-distribution")
- .with_region("us-west-1")
- .with_skip_signature(true)
- .build()
- .unwrap();
-
- v2.list_with_delimiter(Some(&prefix)).await.unwrap();
- }
-}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 6d5aecea2d..a4e39c3b88 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -35,40 +35,33 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
-use itertools::Itertools;
use reqwest::Method;
-use serde::{Deserialize, Serialize};
-use snafu::{ensure, OptionExt, ResultExt, Snafu};
-use std::{str::FromStr, sync::Arc, time::Duration};
+use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
-use tracing::info;
use url::Url;
-use crate::aws::client::{S3Client, S3Config};
-use crate::aws::credential::{
- InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider,
-};
+use crate::aws::client::S3Client;
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
-use crate::client::{
- ClientConfigKey, CredentialProvider, StaticCredentialProvider,
- TokenCredentialProvider,
-};
-use crate::config::ConfigValue;
+use crate::client::CredentialProvider;
use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::signer::Signer;
use crate::{
- ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, Path, PutResult, Result, RetryConfig,
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
Path,
+ PutResult, Result,
};
+mod builder;
mod checksum;
mod client;
mod copy;
mod credential;
+mod resolve;
+pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
pub use checksum::Checksum;
pub use copy::S3CopyIfNotExists;
+pub use resolve::resolve_bucket_region;
//
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
//
@@ -90,103 +83,6 @@ const STORE: &str = "S3";
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential =
AwsCredential>>;
pub use credential::{AwsAuthorizer, AwsCredential};
-/// Default metadata endpoint
-static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";
-
-/// A specialized `Error` for object store-related errors
-#[derive(Debug, Snafu)]
-#[allow(missing_docs)]
-enum Error {
- #[snafu(display("Missing region"))]
- MissingRegion,
-
- #[snafu(display("Missing bucket name"))]
- MissingBucketName,
-
- #[snafu(display("Missing AccessKeyId"))]
- MissingAccessKeyId,
-
- #[snafu(display("Missing SecretAccessKey"))]
- MissingSecretAccessKey,
-
- #[snafu(display("Unable parse source url. Url: {}, Error: {}", url,
source))]
- UnableToParseUrl {
- source: url::ParseError,
- url: String,
- },
-
- #[snafu(display(
- "Unknown url scheme cannot be parsed into storage location: {}",
- scheme
- ))]
- UnknownUrlScheme { scheme: String },
-
- #[snafu(display("URL did not match any known pattern for scheme: {}",
url))]
- UrlNotRecognised { url: String },
-
- #[snafu(display("Configuration key: '{}' is not known.", key))]
- UnknownConfigurationKey { key: String },
-
- #[snafu(display("Bucket '{}' not found", bucket))]
- BucketNotFound { bucket: String },
-
- #[snafu(display("Failed to resolve region for bucket '{}'", bucket))]
- ResolveRegion {
- bucket: String,
- source: reqwest::Error,
- },
-
- #[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
- RegionParse { bucket: String },
-}
-
-impl From<Error> for super::Error {
- fn from(source: Error) -> Self {
- match source {
- Error::UnknownConfigurationKey { key } => {
- Self::UnknownConfigurationKey { store: STORE, key }
- }
- _ => Self::Generic {
- store: STORE,
- source: Box::new(source),
- },
- }
- }
-}
-
-/// Get the bucket region using the [HeadBucket API]. This will fail if the
bucket does not exist.
-///
-/// [HeadBucket API]:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
-pub async fn resolve_bucket_region(
- bucket: &str,
- client_options: &ClientOptions,
-) -> Result<String> {
- use reqwest::StatusCode;
-
- let endpoint = format!("https://{}.s3.amazonaws.com", bucket);
-
- let client = client_options.client()?;
-
- let response = client
- .head(&endpoint)
- .send()
- .await
- .context(ResolveRegionSnafu { bucket })?;
-
- ensure!(
- response.status() != StatusCode::NOT_FOUND,
- BucketNotFoundSnafu { bucket }
- );
-
- let region = response
- .headers()
- .get("x-amz-bucket-region")
- .and_then(|x| x.to_str().ok())
- .context(RegionParseSnafu { bucket })?;
-
- Ok(region.to_string())
-}
-
/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
#[derive(Debug)]
pub struct AmazonS3 {
@@ -256,8 +152,10 @@ impl Signer for AmazonS3 {
AwsAuthorizer::new(&credential, "s3",
&self.client.config().region);
let path_url = self.path_url(path);
- let mut url =
- Url::parse(&path_url).context(UnableToParseUrlSnafu { url:
path_url })?;
+ let mut url = Url::parse(&path_url).map_err(|e| crate::Error::Generic {
+ store: STORE,
+ source: format!("Unable to parse url {path_url}: {e}").into(),
+ })?;
authorizer.sign(method, &mut url, expires_in);
@@ -381,891 +279,23 @@ impl PutPart for S3MultiPartUpload {
}
}
-/// Configure a connection to Amazon S3 using the specified credentials in
-/// the specified Amazon region and bucket.
-///
-/// # Example
-/// ```
-/// # let REGION = "foo";
-/// # let BUCKET_NAME = "foo";
-/// # let ACCESS_KEY_ID = "foo";
-/// # let SECRET_KEY = "foo";
-/// # use object_store::aws::AmazonS3Builder;
-/// let s3 = AmazonS3Builder::new()
-/// .with_region(REGION)
-/// .with_bucket_name(BUCKET_NAME)
-/// .with_access_key_id(ACCESS_KEY_ID)
-/// .with_secret_access_key(SECRET_KEY)
-/// .build();
-/// ```
-#[derive(Debug, Default, Clone)]
-pub struct AmazonS3Builder {
- /// Access key id
- access_key_id: Option<String>,
- /// Secret access_key
- secret_access_key: Option<String>,
- /// Region
- region: Option<String>,
- /// Bucket name
- bucket_name: Option<String>,
- /// Endpoint for communicating with AWS S3
- endpoint: Option<String>,
- /// Token to use for requests
- token: Option<String>,
- /// Url
- url: Option<String>,
- /// Retry config
- retry_config: RetryConfig,
- /// When set to true, fallback to IMDSv1
- imdsv1_fallback: ConfigValue<bool>,
- /// When set to true, virtual hosted style request has to be used
- virtual_hosted_style_request: ConfigValue<bool>,
- /// When set to true, unsigned payload option has to be used
- unsigned_payload: ConfigValue<bool>,
- /// Checksum algorithm which has to be used for object integrity check
during upload
- checksum_algorithm: Option<ConfigValue<Checksum>>,
- /// Metadata endpoint, see
<https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html>
- metadata_endpoint: Option<String>,
- /// Container credentials URL, see
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
- container_credentials_relative_uri: Option<String>,
- /// Client options
- client_options: ClientOptions,
- /// Credentials
- credentials: Option<AwsCredentialProvider>,
- /// Skip signing requests
- skip_signature: ConfigValue<bool>,
- /// Copy if not exists
- copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
-}
-
-/// Configuration keys for [`AmazonS3Builder`]
-///
-/// Configuration via keys can be done via [`AmazonS3Builder::with_config`]
-///
-/// # Example
-/// ```
-/// # use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
-/// let builder = AmazonS3Builder::new()
-/// .with_config("aws_access_key_id".parse().unwrap(), "my-access-key-id")
-/// .with_config(AmazonS3ConfigKey::DefaultRegion, "my-default-region");
-/// ```
-#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Serialize, Deserialize)]
-#[non_exhaustive]
-pub enum AmazonS3ConfigKey {
- /// AWS Access Key
- ///
- /// See [`AmazonS3Builder::with_access_key_id`] for details.
- ///
- /// Supported keys:
- /// - `aws_access_key_id`
- /// - `access_key_id`
- AccessKeyId,
-
- /// Secret Access Key
- ///
- /// See [`AmazonS3Builder::with_secret_access_key`] for details.
- ///
- /// Supported keys:
- /// - `aws_secret_access_key`
- /// - `secret_access_key`
- SecretAccessKey,
-
- /// Region
- ///
- /// See [`AmazonS3Builder::with_region`] for details.
- ///
- /// Supported keys:
- /// - `aws_region`
- /// - `region`
- Region,
-
- /// Default region
- ///
- /// See [`AmazonS3Builder::with_region`] for details.
- ///
- /// Supported keys:
- /// - `aws_default_region`
- /// - `default_region`
- DefaultRegion,
-
- /// Bucket name
- ///
- /// See [`AmazonS3Builder::with_bucket_name`] for details.
- ///
- /// Supported keys:
- /// - `aws_bucket`
- /// - `aws_bucket_name`
- /// - `bucket`
- /// - `bucket_name`
- Bucket,
-
- /// Sets custom endpoint for communicating with AWS S3.
- ///
- /// See [`AmazonS3Builder::with_endpoint`] for details.
- ///
- /// Supported keys:
- /// - `aws_endpoint`
- /// - `aws_endpoint_url`
- /// - `endpoint`
- /// - `endpoint_url`
- Endpoint,
-
- /// Token to use for requests (passed to underlying provider)
- ///
- /// See [`AmazonS3Builder::with_token`] for details.
- ///
- /// Supported keys:
- /// - `aws_session_token`
- /// - `aws_token`
- /// - `session_token`
- /// - `token`
- Token,
-
- /// Fall back to ImdsV1
- ///
- /// See [`AmazonS3Builder::with_imdsv1_fallback`] for details.
- ///
- /// Supported keys:
- /// - `aws_imdsv1_fallback`
- /// - `imdsv1_fallback`
- ImdsV1Fallback,
-
- /// If virtual hosted style request has to be used
- ///
- /// See [`AmazonS3Builder::with_virtual_hosted_style_request`] for details.
- ///
- /// Supported keys:
- /// - `aws_virtual_hosted_style_request`
- /// - `virtual_hosted_style_request`
- VirtualHostedStyleRequest,
-
- /// Avoid computing payload checksum when calculating signature.
- ///
- /// See [`AmazonS3Builder::with_unsigned_payload`] for details.
- ///
- /// Supported keys:
- /// - `aws_unsigned_payload`
- /// - `unsigned_payload`
- UnsignedPayload,
-
- /// Set the checksum algorithm for this client
- ///
- /// See [`AmazonS3Builder::with_checksum_algorithm`]
- Checksum,
-
- /// Set the instance metadata endpoint
- ///
- /// See [`AmazonS3Builder::with_metadata_endpoint`] for details.
- ///
- /// Supported keys:
- /// - `aws_metadata_endpoint`
- /// - `metadata_endpoint`
- MetadataEndpoint,
-
- /// Set the container credentials relative URI
- ///
- ///
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
- ContainerCredentialsRelativeUri,
-
- /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
- ///
- /// See [`S3CopyIfNotExists`]
- CopyIfNotExists,
-
- /// Skip signing request
- SkipSignature,
-
- /// Client options
- Client(ClientConfigKey),
-}
-
-impl AsRef<str> for AmazonS3ConfigKey {
- fn as_ref(&self) -> &str {
- match self {
- Self::AccessKeyId => "aws_access_key_id",
- Self::SecretAccessKey => "aws_secret_access_key",
- Self::Region => "aws_region",
- Self::Bucket => "aws_bucket",
- Self::Endpoint => "aws_endpoint",
- Self::Token => "aws_session_token",
- Self::ImdsV1Fallback => "aws_imdsv1_fallback",
- Self::VirtualHostedStyleRequest =>
"aws_virtual_hosted_style_request",
- Self::DefaultRegion => "aws_default_region",
- Self::MetadataEndpoint => "aws_metadata_endpoint",
- Self::UnsignedPayload => "aws_unsigned_payload",
- Self::Checksum => "aws_checksum_algorithm",
- Self::ContainerCredentialsRelativeUri => {
- "aws_container_credentials_relative_uri"
- }
- Self::SkipSignature => "aws_skip_signature",
- Self::CopyIfNotExists => "copy_if_not_exists",
- Self::Client(opt) => opt.as_ref(),
- }
- }
-}
-
-impl FromStr for AmazonS3ConfigKey {
- type Err = super::Error;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "aws_access_key_id" | "access_key_id" => Ok(Self::AccessKeyId),
- "aws_secret_access_key" | "secret_access_key" =>
Ok(Self::SecretAccessKey),
- "aws_default_region" | "default_region" => Ok(Self::DefaultRegion),
- "aws_region" | "region" => Ok(Self::Region),
- "aws_bucket" | "aws_bucket_name" | "bucket_name" | "bucket" => {
- Ok(Self::Bucket)
- }
- "aws_endpoint_url" | "aws_endpoint" | "endpoint_url" | "endpoint"
=> {
- Ok(Self::Endpoint)
- }
- "aws_session_token" | "aws_token" | "session_token" | "token" => {
- Ok(Self::Token)
- }
- "aws_virtual_hosted_style_request" |
"virtual_hosted_style_request" => {
- Ok(Self::VirtualHostedStyleRequest)
- }
- "aws_imdsv1_fallback" | "imdsv1_fallback" =>
Ok(Self::ImdsV1Fallback),
- "aws_metadata_endpoint" | "metadata_endpoint" =>
Ok(Self::MetadataEndpoint),
- "aws_unsigned_payload" | "unsigned_payload" =>
Ok(Self::UnsignedPayload),
- "aws_checksum_algorithm" | "checksum_algorithm" =>
Ok(Self::Checksum),
- "aws_container_credentials_relative_uri" => {
- Ok(Self::ContainerCredentialsRelativeUri)
- }
- "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
- "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
- // Backwards compatibility
- "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
- _ => match s.parse() {
- Ok(key) => Ok(Self::Client(key)),
- Err(_) => Err(Error::UnknownConfigurationKey { key: s.into()
}.into()),
- },
- }
- }
-}
-
-impl AmazonS3Builder {
- /// Create a new [`AmazonS3Builder`] with default values.
- pub fn new() -> Self {
- Default::default()
- }
-
- /// Fill the [`AmazonS3Builder`] with regular AWS environment variables
- ///
- /// Variables extracted from environment:
- /// * `AWS_ACCESS_KEY_ID` -> access_key_id
- /// * `AWS_SECRET_ACCESS_KEY` -> secret_access_key
- /// * `AWS_DEFAULT_REGION` -> region
- /// * `AWS_ENDPOINT` -> endpoint
- /// * `AWS_SESSION_TOKEN` -> token
- /// * `AWS_CONTAINER_CREDENTIALS_RELATIVE_URI` ->
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
- /// * `AWS_ALLOW_HTTP` -> set to "true" to permit HTTP connections without
TLS
- /// # Example
- /// ```
- /// use object_store::aws::AmazonS3Builder;
- ///
- /// let s3 = AmazonS3Builder::from_env()
- /// .with_bucket_name("foo")
- /// .build();
- /// ```
- pub fn from_env() -> Self {
- let mut builder: Self = Default::default();
-
- for (os_key, os_value) in std::env::vars_os() {
- if let (Some(key), Some(value)) = (os_key.to_str(),
os_value.to_str()) {
- if key.starts_with("AWS_") {
- if let Ok(config_key) = key.to_ascii_lowercase().parse() {
- builder = builder.with_config(config_key, value);
- }
- }
- }
- }
-
- builder
- }
-
- /// Parse available connection info form a well-known storage URL.
- ///
- /// The supported url schemes are:
- ///
- /// - `s3://<bucket>/<path>`
- /// - `s3a://<bucket>/<path>`
- /// - `https://s3.<region>.amazonaws.com/<bucket>`
- /// - `https://<bucket>.s3.<region>.amazonaws.com`
- /// - `https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket`
- ///
- /// Note: Settings derived from the URL will override any others set on
this builder
- ///
- /// # Example
- /// ```
- /// use object_store::aws::AmazonS3Builder;
- ///
- /// let s3 = AmazonS3Builder::from_env()
- /// .with_url("s3://bucket/path")
- /// .build();
- /// ```
- pub fn with_url(mut self, url: impl Into<String>) -> Self {
- self.url = Some(url.into());
- self
- }
-
- /// Set an option on the builder via a key - value pair.
- pub fn with_config(
- mut self,
- key: AmazonS3ConfigKey,
- value: impl Into<String>,
- ) -> Self {
- match key {
- AmazonS3ConfigKey::AccessKeyId => self.access_key_id =
Some(value.into()),
- AmazonS3ConfigKey::SecretAccessKey => {
- self.secret_access_key = Some(value.into())
- }
- AmazonS3ConfigKey::Region => self.region = Some(value.into()),
- AmazonS3ConfigKey::Bucket => self.bucket_name = Some(value.into()),
- AmazonS3ConfigKey::Endpoint => self.endpoint = Some(value.into()),
- AmazonS3ConfigKey::Token => self.token = Some(value.into()),
- AmazonS3ConfigKey::ImdsV1Fallback =>
self.imdsv1_fallback.parse(value),
- AmazonS3ConfigKey::VirtualHostedStyleRequest => {
- self.virtual_hosted_style_request.parse(value)
- }
- AmazonS3ConfigKey::DefaultRegion => {
- self.region = self.region.or_else(|| Some(value.into()))
- }
- AmazonS3ConfigKey::MetadataEndpoint => {
- self.metadata_endpoint = Some(value.into())
- }
- AmazonS3ConfigKey::UnsignedPayload =>
self.unsigned_payload.parse(value),
- AmazonS3ConfigKey::Checksum => {
- self.checksum_algorithm =
Some(ConfigValue::Deferred(value.into()))
- }
- AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
- self.container_credentials_relative_uri = Some(value.into())
- }
- AmazonS3ConfigKey::Client(key) => {
- self.client_options = self.client_options.with_config(key,
value)
- }
- AmazonS3ConfigKey::SkipSignature =>
self.skip_signature.parse(value),
- AmazonS3ConfigKey::CopyIfNotExists => {
- self.copy_if_not_exists =
Some(ConfigValue::Deferred(value.into()))
- }
- };
- self
- }
-
- /// Set an option on the builder via a key - value pair.
- ///
- /// This method will return an `UnknownConfigKey` error if key cannot be
parsed into [`AmazonS3ConfigKey`].
- #[deprecated(note = "Use with_config")]
- pub fn try_with_option(
- self,
- key: impl AsRef<str>,
- value: impl Into<String>,
- ) -> Result<Self> {
- Ok(self.with_config(key.as_ref().parse()?, value))
- }
-
- /// Hydrate builder from key value pairs
- ///
- /// This method will return an `UnknownConfigKey` error if any key cannot
be parsed into [`AmazonS3ConfigKey`].
- #[deprecated(note = "Use with_config")]
- #[allow(deprecated)]
- pub fn try_with_options<
- I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
- >(
- mut self,
- options: I,
- ) -> Result<Self> {
- for (key, value) in options {
- self = self.try_with_option(key, value)?;
- }
- Ok(self)
- }
-
- /// Get config value via a [`AmazonS3ConfigKey`].
- ///
- /// # Example
- /// ```
- /// use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
- ///
- /// let builder = AmazonS3Builder::from_env()
- /// .with_bucket_name("foo");
- /// let bucket_name =
builder.get_config_value(&AmazonS3ConfigKey::Bucket).unwrap_or_default();
- /// assert_eq!("foo", &bucket_name);
- /// ```
- pub fn get_config_value(&self, key: &AmazonS3ConfigKey) -> Option<String> {
- match key {
- AmazonS3ConfigKey::AccessKeyId => self.access_key_id.clone(),
- AmazonS3ConfigKey::SecretAccessKey =>
self.secret_access_key.clone(),
- AmazonS3ConfigKey::Region | AmazonS3ConfigKey::DefaultRegion => {
- self.region.clone()
- }
- AmazonS3ConfigKey::Bucket => self.bucket_name.clone(),
- AmazonS3ConfigKey::Endpoint => self.endpoint.clone(),
- AmazonS3ConfigKey::Token => self.token.clone(),
- AmazonS3ConfigKey::ImdsV1Fallback =>
Some(self.imdsv1_fallback.to_string()),
- AmazonS3ConfigKey::VirtualHostedStyleRequest => {
- Some(self.virtual_hosted_style_request.to_string())
- }
- AmazonS3ConfigKey::MetadataEndpoint =>
self.metadata_endpoint.clone(),
- AmazonS3ConfigKey::UnsignedPayload =>
Some(self.unsigned_payload.to_string()),
- AmazonS3ConfigKey::Checksum => {
- self.checksum_algorithm.as_ref().map(ToString::to_string)
- }
- AmazonS3ConfigKey::Client(key) =>
self.client_options.get_config_value(key),
- AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
- self.container_credentials_relative_uri.clone()
- }
- AmazonS3ConfigKey::SkipSignature =>
Some(self.skip_signature.to_string()),
- AmazonS3ConfigKey::CopyIfNotExists => {
- self.copy_if_not_exists.as_ref().map(ToString::to_string)
- }
- }
- }
-
- /// Sets properties on this builder based on a URL
- ///
- /// This is a separate member function to allow fallible computation to
- /// be deferred until [`Self::build`] which in turn allows deriving
[`Clone`]
- fn parse_url(&mut self, url: &str) -> Result<()> {
- let parsed = Url::parse(url).context(UnableToParseUrlSnafu { url })?;
- let host = parsed.host_str().context(UrlNotRecognisedSnafu { url })?;
- match parsed.scheme() {
- "s3" | "s3a" => self.bucket_name = Some(host.to_string()),
- "https" => match host.splitn(4, '.').collect_tuple() {
- Some(("s3", region, "amazonaws", "com")) => {
- self.region = Some(region.to_string());
- let bucket =
parsed.path_segments().into_iter().flatten().next();
- if let Some(bucket) = bucket {
- self.bucket_name = Some(bucket.into());
- }
- }
- Some((bucket, "s3", region, "amazonaws.com")) => {
- self.bucket_name = Some(bucket.to_string());
- self.region = Some(region.to_string());
- self.virtual_hosted_style_request = true.into();
- }
- Some((account, "r2", "cloudflarestorage", "com")) => {
- self.region = Some("auto".to_string());
- let endpoint =
format!("https://{account}.r2.cloudflarestorage.com");
- self.endpoint = Some(endpoint);
-
- let bucket =
parsed.path_segments().into_iter().flatten().next();
- if let Some(bucket) = bucket {
- self.bucket_name = Some(bucket.into());
- }
- }
- _ => return Err(UrlNotRecognisedSnafu { url }.build().into()),
- },
- scheme => return Err(UnknownUrlSchemeSnafu { scheme
}.build().into()),
- };
- Ok(())
- }
-
- /// Set the AWS Access Key (required)
- pub fn with_access_key_id(mut self, access_key_id: impl Into<String>) ->
Self {
- self.access_key_id = Some(access_key_id.into());
- self
- }
-
- /// Set the AWS Secret Access Key (required)
- pub fn with_secret_access_key(
- mut self,
- secret_access_key: impl Into<String>,
- ) -> Self {
- self.secret_access_key = Some(secret_access_key.into());
- self
- }
-
- /// Set the region (e.g. `us-east-1`) (required)
- pub fn with_region(mut self, region: impl Into<String>) -> Self {
- self.region = Some(region.into());
- self
- }
-
- /// Set the bucket_name (required)
- pub fn with_bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
- self.bucket_name = Some(bucket_name.into());
- self
- }
-
- /// Sets the endpoint for communicating with AWS S3. Default value
- /// is based on region. The `endpoint` field should be consistent with
- /// the field `virtual_hosted_style_request'.
- ///
- /// For example, this might be set to `"http://localhost:4566:`
- /// for testing against a localstack instance.
- /// If `virtual_hosted_style_request` is set to true then `endpoint`
- /// should have bucket name included.
- pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
- self.endpoint = Some(endpoint.into());
- self
- }
-
- /// Set the token to use for requests (passed to underlying provider)
- pub fn with_token(mut self, token: impl Into<String>) -> Self {
- self.token = Some(token.into());
- self
- }
-
- /// Set the credential provider overriding any other options
- pub fn with_credentials(mut self, credentials: AwsCredentialProvider) ->
Self {
- self.credentials = Some(credentials);
- self
- }
-
- /// Sets what protocol is allowed. If `allow_http` is :
- /// * false (default): Only HTTPS are allowed
- /// * true: HTTP and HTTPS are allowed
- pub fn with_allow_http(mut self, allow_http: bool) -> Self {
- self.client_options = self.client_options.with_allow_http(allow_http);
- self
- }
-
- /// Sets if virtual hosted style request has to be used.
- /// If `virtual_hosted_style_request` is :
- /// * false (default): Path style request is used
- /// * true: Virtual hosted style request is used
- ///
- /// If the `endpoint` is provided then it should be
- /// consistent with `virtual_hosted_style_request`.
- /// i.e. if `virtual_hosted_style_request` is set to true
- /// then `endpoint` should have bucket name included.
- pub fn with_virtual_hosted_style_request(
- mut self,
- virtual_hosted_style_request: bool,
- ) -> Self {
- self.virtual_hosted_style_request =
virtual_hosted_style_request.into();
- self
- }
-
- /// Set the retry configuration
- pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
- self.retry_config = retry_config;
- self
- }
-
- /// By default instance credentials will only be fetched over [IMDSv2], as
AWS recommends
- /// against having IMDSv1 enabled on EC2 instances as it is vulnerable to
[SSRF attack]
- ///
- /// However, certain deployment environments, such as those running old
versions of kube2iam,
- /// may not support IMDSv2. This option will enable automatic fallback to
using IMDSv1
- /// if the token endpoint returns a 403 error indicating that IMDSv2 is
not supported.
- ///
- /// This option has no effect if not using instance credentials
- ///
- /// [IMDSv2]:
https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-instance-metadata-service.html
- /// [SSRF attack]:
https://aws.amazon.com/blogs/security/defense-in-depth-open-firewalls-reverse-proxies-ssrf-vulnerabilities-ec2-instance-metadata-service/
- ///
- pub fn with_imdsv1_fallback(mut self) -> Self {
- self.imdsv1_fallback = true.into();
- self
- }
-
- /// Sets if unsigned payload option has to be used.
- /// See [unsigned payload
option](https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html)
- /// * false (default): Signed payload option is used, where the checksum
for the request body is computed and included when constructing a canonical
request.
- /// * true: Unsigned payload option is used. `UNSIGNED-PAYLOAD` literal is
included when constructing a canonical request,
- pub fn with_unsigned_payload(mut self, unsigned_payload: bool) -> Self {
- self.unsigned_payload = unsigned_payload.into();
- self
- }
-
- /// If enabled, [`AmazonS3`] will not fetch credentials and will not sign
requests
- ///
- /// This can be useful when interacting with public S3 buckets that deny
authorized requests
- pub fn with_skip_signature(mut self, skip_signature: bool) -> Self {
- self.skip_signature = skip_signature.into();
- self
- }
-
- /// Sets the [checksum algorithm] which has to be used for object
integrity check during upload.
- ///
- /// [checksum algorithm]:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
- pub fn with_checksum_algorithm(mut self, checksum_algorithm: Checksum) ->
Self {
- // Convert to String to enable deferred parsing of config
- self.checksum_algorithm = Some(checksum_algorithm.into());
- self
- }
-
- /// Set the [instance metadata
endpoint](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html),
- /// used primarily within AWS EC2.
- ///
- /// This defaults to the IPv4 endpoint: http://169.254.169.254. One can
alternatively use the IPv6
- /// endpoint http://fd00:ec2::254.
- pub fn with_metadata_endpoint(mut self, endpoint: impl Into<String>) ->
Self {
- self.metadata_endpoint = Some(endpoint.into());
- self
- }
-
- /// Set the proxy_url to be used by the underlying client
- pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
- self.client_options = self.client_options.with_proxy_url(proxy_url);
- self
- }
-
- /// Set a trusted proxy CA certificate
- pub fn with_proxy_ca_certificate(
- mut self,
- proxy_ca_certificate: impl Into<String>,
- ) -> Self {
- self.client_options = self
- .client_options
- .with_proxy_ca_certificate(proxy_ca_certificate);
- self
- }
-
- /// Set a list of hosts to exclude from proxy connections
- pub fn with_proxy_excludes(mut self, proxy_excludes: impl Into<String>) ->
Self {
- self.client_options =
self.client_options.with_proxy_excludes(proxy_excludes);
- self
- }
-
- /// Sets the client options, overriding any already set
- pub fn with_client_options(mut self, options: ClientOptions) -> Self {
- self.client_options = options;
- self
- }
-
- /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
- pub fn with_copy_if_not_exists(mut self, config: S3CopyIfNotExists) ->
Self {
- self.copy_if_not_exists = Some(config.into());
- self
- }
-
- /// Create a [`AmazonS3`] instance from the provided values,
- /// consuming `self`.
- pub fn build(mut self) -> Result<AmazonS3> {
- if let Some(url) = self.url.take() {
- self.parse_url(&url)?;
- }
-
- let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
- let region = self.region.context(MissingRegionSnafu)?;
- let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
- let copy_if_not_exists = self.copy_if_not_exists.map(|x|
x.get()).transpose()?;
-
- let credentials = if let Some(credentials) = self.credentials {
- credentials
- } else if self.access_key_id.is_some() ||
self.secret_access_key.is_some() {
- match (self.access_key_id, self.secret_access_key, self.token) {
- (Some(key_id), Some(secret_key), token) => {
- info!("Using Static credential provider");
- let credential = AwsCredential {
- key_id,
- secret_key,
- token,
- };
- Arc::new(StaticCredentialProvider::new(credential)) as _
- }
- (None, Some(_), _) => return
Err(Error::MissingAccessKeyId.into()),
- (Some(_), None, _) => return
Err(Error::MissingSecretAccessKey.into()),
- (None, None, _) => unreachable!(),
- }
- } else if let (Ok(token_path), Ok(role_arn)) = (
- std::env::var("AWS_WEB_IDENTITY_TOKEN_FILE"),
- std::env::var("AWS_ROLE_ARN"),
- ) {
- // TODO: Replace with `AmazonS3Builder::credentials_from_env`
- info!("Using WebIdentity credential provider");
-
- let session_name = std::env::var("AWS_ROLE_SESSION_NAME")
- .unwrap_or_else(|_| "WebIdentitySession".to_string());
-
- let endpoint = format!("https://sts.{region}.amazonaws.com");
-
- // Disallow non-HTTPs requests
- let client = self
- .client_options
- .clone()
- .with_allow_http(false)
- .client()?;
-
- let token = WebIdentityProvider {
- token_path,
- session_name,
- role_arn,
- endpoint,
- };
-
- Arc::new(TokenCredentialProvider::new(
- token,
- client,
- self.retry_config.clone(),
- )) as _
- } else if let Some(uri) = self.container_credentials_relative_uri {
- info!("Using Task credential provider");
- Arc::new(TaskCredentialProvider {
- url: format!("http://169.254.170.2{uri}"),
- retry: self.retry_config.clone(),
- // The instance metadata endpoint is access over HTTP
- client:
self.client_options.clone().with_allow_http(true).client()?,
- cache: Default::default(),
- }) as _
- } else {
- info!("Using Instance credential provider");
-
- let token = InstanceCredentialProvider {
- cache: Default::default(),
- imdsv1_fallback: self.imdsv1_fallback.get()?,
- metadata_endpoint: self
- .metadata_endpoint
- .unwrap_or_else(|| DEFAULT_METADATA_ENDPOINT.into()),
- };
-
- Arc::new(TokenCredentialProvider::new(
- token,
- self.client_options.metadata_client()?,
- self.retry_config.clone(),
- )) as _
- };
-
- let endpoint: String;
- let bucket_endpoint: String;
-
- // If `endpoint` is provided then its assumed to be consistent with
- // `virtual_hosted_style_request`. i.e. if
`virtual_hosted_style_request` is true then
- // `endpoint` should have bucket name included.
- if self.virtual_hosted_style_request.get()? {
- endpoint = self
- .endpoint
- .unwrap_or_else(||
format!("https://{bucket}.s3.{region}.amazonaws.com"));
- bucket_endpoint = endpoint.clone();
- } else {
- endpoint = self
- .endpoint
- .unwrap_or_else(||
format!("https://s3.{region}.amazonaws.com"));
- bucket_endpoint = format!("{endpoint}/{bucket}");
- }
-
- let config = S3Config {
- region,
- endpoint,
- bucket,
- bucket_endpoint,
- credentials,
- retry_config: self.retry_config,
- client_options: self.client_options,
- sign_payload: !self.unsigned_payload.get()?,
- skip_signature: self.skip_signature.get()?,
- checksum,
- copy_if_not_exists,
- };
-
- let client = Arc::new(S3Client::new(config)?);
-
- Ok(AmazonS3 { client })
- }
-}
-
#[cfg(test)]
mod tests {
use super::*;
- use crate::tests::{
- copy_if_not_exists, get_nonexistent_object, get_opts,
- list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list_opts,
- rename_and_copy, stream_get,
- };
+ use crate::tests::*;
use bytes::Bytes;
- use std::collections::HashMap;
const NON_EXISTENT_NAME: &str = "nonexistentname";
- #[test]
- fn s3_test_config_from_map() {
- let aws_access_key_id = "object_store:fake_access_key_id".to_string();
- let aws_secret_access_key = "object_store:fake_secret_key".to_string();
- let aws_default_region =
"object_store:fake_default_region".to_string();
- let aws_endpoint = "object_store:fake_endpoint".to_string();
- let aws_session_token = "object_store:fake_session_token".to_string();
- let options = HashMap::from([
- ("aws_access_key_id", aws_access_key_id.clone()),
- ("aws_secret_access_key", aws_secret_access_key),
- ("aws_default_region", aws_default_region.clone()),
- ("aws_endpoint", aws_endpoint.clone()),
- ("aws_session_token", aws_session_token.clone()),
- ("aws_unsigned_payload", "true".to_string()),
- ("aws_checksum_algorithm", "sha256".to_string()),
- ]);
-
- let builder = options
- .into_iter()
- .fold(AmazonS3Builder::new(), |builder, (key, value)| {
- builder.with_config(key.parse().unwrap(), value)
- })
- .with_config(AmazonS3ConfigKey::SecretAccessKey, "new-secret-key");
-
- assert_eq!(builder.access_key_id.unwrap(), aws_access_key_id.as_str());
- assert_eq!(builder.secret_access_key.unwrap(), "new-secret-key");
- assert_eq!(builder.region.unwrap(), aws_default_region);
- assert_eq!(builder.endpoint.unwrap(), aws_endpoint);
- assert_eq!(builder.token.unwrap(), aws_session_token);
- assert_eq!(
- builder.checksum_algorithm.unwrap().get().unwrap(),
- Checksum::SHA256
- );
- assert!(builder.unsigned_payload.get().unwrap());
- }
-
- #[test]
- fn s3_test_config_get_value() {
- let aws_access_key_id = "object_store:fake_access_key_id".to_string();
- let aws_secret_access_key = "object_store:fake_secret_key".to_string();
- let aws_default_region =
"object_store:fake_default_region".to_string();
- let aws_endpoint = "object_store:fake_endpoint".to_string();
- let aws_session_token = "object_store:fake_session_token".to_string();
-
- let builder = AmazonS3Builder::new()
- .with_config(AmazonS3ConfigKey::AccessKeyId, &aws_access_key_id)
- .with_config(AmazonS3ConfigKey::SecretAccessKey,
&aws_secret_access_key)
- .with_config(AmazonS3ConfigKey::DefaultRegion, &aws_default_region)
- .with_config(AmazonS3ConfigKey::Endpoint, &aws_endpoint)
- .with_config(AmazonS3ConfigKey::Token, &aws_session_token)
- .with_config(AmazonS3ConfigKey::UnsignedPayload, "true");
-
- assert_eq!(
- builder
- .get_config_value(&AmazonS3ConfigKey::AccessKeyId)
- .unwrap(),
- aws_access_key_id
- );
- assert_eq!(
- builder
- .get_config_value(&AmazonS3ConfigKey::SecretAccessKey)
- .unwrap(),
- aws_secret_access_key
- );
- assert_eq!(
- builder
- .get_config_value(&AmazonS3ConfigKey::DefaultRegion)
- .unwrap(),
- aws_default_region
- );
- assert_eq!(
- builder
- .get_config_value(&AmazonS3ConfigKey::Endpoint)
- .unwrap(),
- aws_endpoint
- );
- assert_eq!(
- builder.get_config_value(&AmazonS3ConfigKey::Token).unwrap(),
- aws_session_token
- );
- assert_eq!(
- builder
- .get_config_value(&AmazonS3ConfigKey::UnsignedPayload)
- .unwrap(),
- "true"
- );
- }
-
#[tokio::test]
async fn s3_test() {
crate::test_util::maybe_skip_integration!();
let config = AmazonS3Builder::from_env();
- let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
- let test_not_exists = config.copy_if_not_exists.is_some();
let integration = config.build().unwrap();
+ let config = integration.client.config();
+ let is_local = config.endpoint.starts_with("http://");
+ let test_not_exists = config.copy_if_not_exists.is_some();
// Localstack doesn't support listing with spaces
https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, is_local).await;
@@ -1279,16 +309,14 @@ mod tests {
}
// run integration test with unsigned payload enabled
- let config = AmazonS3Builder::from_env().with_unsigned_payload(true);
- let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
- let integration = config.build().unwrap();
+ let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
+ let integration = builder.build().unwrap();
put_get_delete_list_opts(&integration, is_local).await;
// run integration test with checksum set to sha256
- let config =
+ let builder =
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
- let is_local = matches!(&config.endpoint, Some(e) if
e.starts_with("http://"));
- let integration = config.build().unwrap();
+ let integration = builder.build().unwrap();
put_get_delete_list_opts(&integration, is_local).await;
}
@@ -1352,161 +380,6 @@ mod tests {
assert!(matches!(err, crate::Error::NotFound { .. }), "{}", err);
}
- #[tokio::test]
- async fn s3_test_proxy_url() {
- let s3 = AmazonS3Builder::new()
- .with_access_key_id("access_key_id")
- .with_secret_access_key("secret_access_key")
- .with_region("region")
- .with_bucket_name("bucket_name")
- .with_allow_http(true)
- .with_proxy_url("https://example.com")
- .build();
-
- assert!(s3.is_ok());
-
- let err = AmazonS3Builder::new()
- .with_access_key_id("access_key_id")
- .with_secret_access_key("secret_access_key")
- .with_region("region")
- .with_bucket_name("bucket_name")
- .with_allow_http(true)
- .with_proxy_url("asdf://example.com")
- .build()
- .unwrap_err()
- .to_string();
-
- assert_eq!(
- "Generic HTTP client error: builder error: unknown proxy scheme",
- err
- );
- }
-
- #[test]
- fn s3_test_urls() {
- let mut builder = AmazonS3Builder::new();
- builder.parse_url("s3://bucket/path").unwrap();
- assert_eq!(builder.bucket_name, Some("bucket".to_string()));
-
- let mut builder = AmazonS3Builder::new();
- builder
- .parse_url("s3://buckets.can.have.dots/path")
- .unwrap();
- assert_eq!(
- builder.bucket_name,
- Some("buckets.can.have.dots".to_string())
- );
-
- let mut builder = AmazonS3Builder::new();
- builder
- .parse_url("https://s3.region.amazonaws.com")
- .unwrap();
- assert_eq!(builder.region, Some("region".to_string()));
-
- let mut builder = AmazonS3Builder::new();
- builder
- .parse_url("https://s3.region.amazonaws.com/bucket")
- .unwrap();
- assert_eq!(builder.region, Some("region".to_string()));
- assert_eq!(builder.bucket_name, Some("bucket".to_string()));
-
- let mut builder = AmazonS3Builder::new();
- builder
- .parse_url("https://s3.region.amazonaws.com/bucket.with.dot/path")
- .unwrap();
- assert_eq!(builder.region, Some("region".to_string()));
- assert_eq!(builder.bucket_name, Some("bucket.with.dot".to_string()));
-
- let mut builder = AmazonS3Builder::new();
- builder
- .parse_url("https://bucket.s3.region.amazonaws.com")
- .unwrap();
- assert_eq!(builder.bucket_name, Some("bucket".to_string()));
- assert_eq!(builder.region, Some("region".to_string()));
- assert!(builder.virtual_hosted_style_request.get().unwrap());
-
- let mut builder = AmazonS3Builder::new();
- builder
-
.parse_url("https://account123.r2.cloudflarestorage.com/bucket-123")
- .unwrap();
-
- assert_eq!(builder.bucket_name, Some("bucket-123".to_string()));
- assert_eq!(builder.region, Some("auto".to_string()));
- assert_eq!(
- builder.endpoint,
- Some("https://account123.r2.cloudflarestorage.com".to_string())
- );
-
- let err_cases = [
- "mailto://bucket/path",
- "https://s3.bucket.mydomain.com",
- "https://s3.bucket.foo.amazonaws.com",
- "https://bucket.mydomain.region.amazonaws.com",
- "https://bucket.s3.region.bar.amazonaws.com",
- "https://bucket.foo.s3.amazonaws.com",
- ];
- let mut builder = AmazonS3Builder::new();
- for case in err_cases {
- builder.parse_url(case).unwrap_err();
- }
- }
-
- #[test]
- fn test_invalid_config() {
- let err = AmazonS3Builder::new()
- .with_config(AmazonS3ConfigKey::ImdsV1Fallback, "enabled")
- .with_bucket_name("bucket")
- .with_region("region")
- .build()
- .unwrap_err()
- .to_string();
-
- assert_eq!(
- err,
- "Generic Config error: failed to parse \"enabled\" as boolean"
- );
-
- let err = AmazonS3Builder::new()
- .with_config(AmazonS3ConfigKey::Checksum, "md5")
- .with_bucket_name("bucket")
- .with_region("region")
- .build()
- .unwrap_err()
- .to_string();
-
- assert_eq!(
- err,
- "Generic Config error: \"md5\" is not a valid checksum algorithm"
- );
- }
-}
-
-#[cfg(test)]
-mod s3_resolve_bucket_region_tests {
- use super::*;
-
- #[tokio::test]
- async fn test_private_bucket() {
- let bucket = "bloxbender";
-
- let region = resolve_bucket_region(bucket, &ClientOptions::new())
- .await
- .unwrap();
-
- let expected = "us-west-2".to_string();
-
- assert_eq!(region, expected);
- }
-
- #[tokio::test]
- async fn test_bucket_does_not_exist() {
- let bucket = "please-dont-exist";
-
- let result = resolve_bucket_region(bucket,
&ClientOptions::new()).await;
-
- assert!(result.is_err());
- }
-
#[tokio::test]
#[ignore = "Tests shouldn't call use remote services by default"]
async fn test_disable_creds() {
diff --git a/object_store/src/aws/resolve.rs b/object_store/src/aws/resolve.rs
new file mode 100644
index 0000000000..2b21fabd34
--- /dev/null
+++ b/object_store/src/aws/resolve.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aws::STORE;
+use crate::{ClientOptions, Result};
+use snafu::{ensure, OptionExt, ResultExt, Snafu};
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+ #[snafu(display("Bucket '{}' not found", bucket))]
+ BucketNotFound { bucket: String },
+
+ #[snafu(display("Failed to resolve region for bucket '{}'", bucket))]
+ ResolveRegion {
+ bucket: String,
+ source: reqwest::Error,
+ },
+
+ #[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
+ RegionParse { bucket: String },
+}
+
+impl From<Error> for crate::Error {
+ fn from(source: Error) -> Self {
+ Self::Generic {
+ store: STORE,
+ source: Box::new(source),
+ }
+ }
+}
+
+/// Get the bucket region using the [HeadBucket API]. This will fail if the
bucket does not exist.
+///
+/// [HeadBucket API]:
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadBucket.html
+pub async fn resolve_bucket_region(
+ bucket: &str,
+ client_options: &ClientOptions,
+) -> Result<String> {
+ use reqwest::StatusCode;
+
+ let endpoint = format!("https://{}.s3.amazonaws.com", bucket);
+
+ let client = client_options.client()?;
+
+ let response = client
+ .head(&endpoint)
+ .send()
+ .await
+ .context(ResolveRegionSnafu { bucket })?;
+
+ ensure!(
+ response.status() != StatusCode::NOT_FOUND,
+ BucketNotFoundSnafu { bucket }
+ );
+
+ let region = response
+ .headers()
+ .get("x-amz-bucket-region")
+ .and_then(|x| x.to_str().ok())
+ .context(RegionParseSnafu { bucket })?;
+
+ Ok(region.to_string())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_private_bucket() {
+ let bucket = "bloxbender";
+
+ let region = resolve_bucket_region(bucket, &ClientOptions::new())
+ .await
+ .unwrap();
+
+ let expected = "us-west-2".to_string();
+
+ assert_eq!(region, expected);
+ }
+
+ #[tokio::test]
+ async fn test_bucket_does_not_exist() {
+ let bucket = "please-dont-exist";
+
+ let result = resolve_bucket_region(bucket,
&ClientOptions::new()).await;
+
+ assert!(result.is_err());
+ }
+}