This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f597d3a687 Split gcp Module (#4956)
f597d3a687 is described below
commit f597d3a6874264ebd9cf28a0d07a7fae52df440b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Oct 19 13:45:43 2023 +0100
Split gcp Module (#4956)
* Split out GCP client
* Split out builder
* RAT
---
object_store/src/gcp/{mod.rs => builder.rs} | 802 +++-----------------
object_store/src/gcp/client.rs | 446 +++++++++++
object_store/src/gcp/mod.rs | 1097 +--------------------------
3 files changed, 557 insertions(+), 1788 deletions(-)
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/builder.rs
similarity index 51%
copy from object_store/src/gcp/mod.rs
copy to object_store/src/gcp/builder.rs
index 97755c07c6..920ab8b2a9 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/builder.rs
@@ -15,118 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-//! An object store implementation for Google Cloud Storage
-//!
-//! ## Multi-part uploads
-//!
-//! [Multi-part
uploads](https://cloud.google.com/storage/docs/multipart-uploads)
-//! can be initiated with the [ObjectStore::put_multipart] method.
-//! Data passed to the writer is automatically buffered to meet the minimum
size
-//! requirements for a part. Multiple parts are uploaded concurrently.
-//!
-//! If the writer fails for any reason, you may have parts uploaded to GCS but
not
-//! used that you may be charged for. Use the [ObjectStore::abort_multipart]
method
-//! to abort the upload and drop those unneeded parts. In addition, you may
wish to
-//! consider implementing automatic clean up of unused parts that are older
than one
-//! week.
-use std::str::FromStr;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use bytes::{Buf, Bytes};
-use futures::stream::BoxStream;
-use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
-use reqwest::{header, Client, Method, Response, StatusCode};
-use serde::{Deserialize, Serialize};
-use snafu::{OptionExt, ResultExt, Snafu};
-use tokio::io::AsyncWrite;
-use url::Url;
-
-use crate::client::get::{GetClient, GetClientExt};
-use crate::client::list::{ListClient, ListClientExt};
-use crate::client::list_response::ListResponse;
-use crate::client::retry::RetryExt;
-use crate::client::{
- ClientConfigKey, CredentialProvider, GetOptionsExt,
StaticCredentialProvider,
- TokenCredentialProvider,
+use crate::client::TokenCredentialProvider;
+use crate::gcp::client::{GoogleCloudStorageClient, GoogleCloudStorageConfig};
+use crate::gcp::credential::{
+ ApplicationDefaultCredentials, InstanceCredentialProvider,
ServiceAccountCredentials,
+ DEFAULT_GCS_BASE_URL,
+};
+use crate::gcp::{
+ credential, GcpCredential, GcpCredentialProvider, GoogleCloudStorage,
STORE,
};
use crate::{
- multipart::{PartId, PutPart, WriteMultiPart},
- path::{Path, DELIMITER},
- ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, PutResult, Result, RetryConfig,
+ ClientConfigKey, ClientOptions, Result, RetryConfig,
StaticCredentialProvider,
};
-
-use credential::{InstanceCredentialProvider, ServiceAccountCredentials};
-
-mod credential;
-
-const STORE: &str = "GCS";
-
-/// [`CredentialProvider`] for [`GoogleCloudStorage`]
-pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential =
GcpCredential>>;
-use crate::client::header::get_etag;
-use crate::gcp::credential::{ApplicationDefaultCredentials,
DEFAULT_GCS_BASE_URL};
-pub use credential::GcpCredential;
+use serde::{Deserialize, Serialize};
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::str::FromStr;
+use std::sync::Arc;
+use url::Url;
#[derive(Debug, Snafu)]
enum Error {
- #[snafu(display("Got invalid XML response for {} {}: {}", method, url,
source))]
- InvalidXMLResponse {
- source: quick_xml::de::DeError,
- method: String,
- url: String,
- data: Bytes,
- },
-
- #[snafu(display("Error performing list request: {}", source))]
- ListRequest { source: crate::client::retry::Error },
-
- #[snafu(display("Error getting list response body: {}", source))]
- ListResponseBody { source: reqwest::Error },
-
- #[snafu(display("Got invalid list response: {}", source))]
- InvalidListResponse { source: quick_xml::de::DeError },
-
- #[snafu(display("Error performing get request {}: {}", path, source))]
- GetRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error getting get response body {}: {}", path, source))]
- GetResponseBody {
- source: reqwest::Error,
- path: String,
- },
-
- #[snafu(display("Error performing delete request {}: {}", path, source))]
- DeleteRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error performing put request {}: {}", path, source))]
- PutRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error getting put response body: {}", source))]
- PutResponseBody { source: reqwest::Error },
-
- #[snafu(display("Got invalid put response: {}", source))]
- InvalidPutResponse { source: quick_xml::de::DeError },
-
- #[snafu(display("Error performing post request {}: {}", path, source))]
- PostRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error decoding object size: {}", source))]
- InvalidSize { source: std::num::ParseIntError },
-
#[snafu(display("Missing bucket name"))]
MissingBucketName {},
@@ -135,9 +43,6 @@ enum Error {
))]
ServiceAccountPathAndKeyProvided,
- #[snafu(display("GCP credential error: {}", source))]
- Credential { source: credential::Error },
-
#[snafu(display("Unable parse source url. Url: {}, Error: {}", url,
source))]
UnableToParseUrl {
source: url::ParseError,
@@ -160,14 +65,14 @@ enum Error {
Metadata {
source: crate::client::header::Error,
},
+
+ #[snafu(display("GCP credential error: {}", source))]
+ Credential { source: credential::Error },
}
-impl From<Error> for super::Error {
+impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
match err {
- Error::GetRequest { source, path }
- | Error::DeleteRequest { source, path }
- | Error::PutRequest { source, path } => source.error(STORE, path),
Error::UnknownConfigurationKey { key } => {
Self::UnknownConfigurationKey { store: STORE, key }
}
@@ -179,428 +84,6 @@ impl From<Error> for super::Error {
}
}
-#[derive(serde::Deserialize, Debug)]
-#[serde(rename_all = "PascalCase")]
-struct InitiateMultipartUploadResult {
- upload_id: String,
-}
-
-#[derive(serde::Serialize, Debug)]
-#[serde(rename_all = "PascalCase", rename(serialize = "Part"))]
-struct MultipartPart {
- #[serde(rename = "PartNumber")]
- part_number: usize,
- e_tag: String,
-}
-
-#[derive(serde::Serialize, Debug)]
-#[serde(rename_all = "PascalCase")]
-struct CompleteMultipartUpload {
- #[serde(rename = "Part", default)]
- parts: Vec<MultipartPart>,
-}
-
-/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
-#[derive(Debug)]
-pub struct GoogleCloudStorage {
- client: Arc<GoogleCloudStorageClient>,
-}
-
-impl std::fmt::Display for GoogleCloudStorage {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GoogleCloudStorage({})", self.client.bucket_name)
- }
-}
-
-impl GoogleCloudStorage {
- /// Returns the [`GcpCredentialProvider`] used by [`GoogleCloudStorage`]
- pub fn credentials(&self) -> &GcpCredentialProvider {
- &self.client.credentials
- }
-}
-
-#[derive(Debug)]
-struct GoogleCloudStorageClient {
- client: Client,
- base_url: String,
-
- credentials: GcpCredentialProvider,
-
- bucket_name: String,
- bucket_name_encoded: String,
-
- retry_config: RetryConfig,
- client_options: ClientOptions,
-
- // TODO: Hook this up in tests
- max_list_results: Option<String>,
-}
-
-impl GoogleCloudStorageClient {
- async fn get_credential(&self) -> Result<Arc<GcpCredential>> {
- self.credentials.get_credential().await
- }
-
- fn object_url(&self, path: &Path) -> String {
- let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
- format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded)
- }
-
- /// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
- ///
- /// Returns the new ETag
- async fn put_request<T: Serialize + ?Sized + Sync>(
- &self,
- path: &Path,
- payload: Bytes,
- query: &T,
- ) -> Result<String> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let content_type = self
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
-
- let response = self
- .client
- .request(Method::PUT, url)
- .query(query)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, content_type)
- .header(header::CONTENT_LENGTH, payload.len())
- .body(payload)
- .send_retry(&self.retry_config)
- .await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(get_etag(response.headers()).context(MetadataSnafu)?)
- }
-
- /// Initiate a multi-part upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
- async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId> {
- let credential = self.get_credential().await?;
- let url = format!("{}/{}/{}", self.base_url, self.bucket_name_encoded,
path);
-
- let content_type = self
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
-
- let response = self
- .client
- .request(Method::POST, &url)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, content_type)
- .header(header::CONTENT_LENGTH, "0")
- .query(&[("uploads", "")])
- .send_retry(&self.retry_config)
- .await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?;
-
- let data = response.bytes().await.context(PutResponseBodySnafu)?;
- let result: InitiateMultipartUploadResult =
- quick_xml::de::from_reader(data.as_ref().reader())
- .context(InvalidPutResponseSnafu)?;
-
- Ok(result.upload_id)
- }
-
- /// Cleanup unused parts
<https://cloud.google.com/storage/docs/xml-api/delete-multipart>
- async fn multipart_cleanup(
- &self,
- path: &str,
- multipart_id: &MultipartId,
- ) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = format!("{}/{}/{}", self.base_url, self.bucket_name_encoded,
path);
-
- self.client
- .request(Method::DELETE, &url)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, "application/octet-stream")
- .header(header::CONTENT_LENGTH, "0")
- .query(&[("uploadId", multipart_id)])
- .send_retry(&self.retry_config)
- .await
- .context(PutRequestSnafu { path })?;
-
- Ok(())
- }
-
- /// Perform a delete request
<https://cloud.google.com/storage/docs/xml-api/delete-object>
- async fn delete_request(&self, path: &Path) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let builder = self.client.request(Method::DELETE, url);
- builder
- .bearer_auth(&credential.bearer)
- .send_retry(&self.retry_config)
- .await
- .context(DeleteRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(())
- }
-
- /// Perform a copy request
<https://cloud.google.com/storage/docs/xml-api/put-object-copy>
- async fn copy_request(
- &self,
- from: &Path,
- to: &Path,
- if_not_exists: bool,
- ) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = self.object_url(to);
-
- let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
- let source = format!("{}/{}", self.bucket_name_encoded, from);
-
- let mut builder = self
- .client
- .request(Method::PUT, url)
- .header("x-goog-copy-source", source);
-
- if if_not_exists {
- builder = builder.header("x-goog-if-generation-match", 0);
- }
-
- builder
- .bearer_auth(&credential.bearer)
- // Needed if reqwest is compiled with native-tls instead of
rustls-tls
- // See https://github.com/apache/arrow-rs/pull/3921
- .header(header::CONTENT_LENGTH, 0)
- .send_retry(&self.retry_config)
- .await
- .map_err(|err| match err.status() {
- Some(StatusCode::PRECONDITION_FAILED) =>
crate::Error::AlreadyExists {
- source: Box::new(err),
- path: to.to_string(),
- },
- _ => err.error(STORE, from.to_string()),
- })?;
-
- Ok(())
- }
-}
-
-#[async_trait]
-impl GetClient for GoogleCloudStorageClient {
- const STORE: &'static str = STORE;
-
- /// Perform a get request
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<Response> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let method = match options.head {
- true => Method::HEAD,
- false => Method::GET,
- };
-
- let mut request = self.client.request(method,
url).with_get_options(options);
-
- if !credential.bearer.is_empty() {
- request = request.bearer_auth(&credential.bearer);
- }
-
- let response =
- request
- .send_retry(&self.retry_config)
- .await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(response)
- }
-}
-
-#[async_trait]
-impl ListClient for GoogleCloudStorageClient {
- /// Perform a list request
<https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
- async fn list_request(
- &self,
- prefix: Option<&str>,
- delimiter: bool,
- page_token: Option<&str>,
- offset: Option<&str>,
- ) -> Result<(ListResult, Option<String>)> {
- assert!(offset.is_none()); // Not yet supported
-
- let credential = self.get_credential().await?;
- let url = format!("{}/{}", self.base_url, self.bucket_name_encoded);
-
- let mut query = Vec::with_capacity(5);
- query.push(("list-type", "2"));
- if delimiter {
- query.push(("delimiter", DELIMITER))
- }
-
- if let Some(prefix) = &prefix {
- query.push(("prefix", prefix))
- }
-
- if let Some(page_token) = page_token {
- query.push(("continuation-token", page_token))
- }
-
- if let Some(max_results) = &self.max_list_results {
- query.push(("max-keys", max_results))
- }
-
- let response = self
- .client
- .request(Method::GET, url)
- .query(&query)
- .bearer_auth(&credential.bearer)
- .send_retry(&self.retry_config)
- .await
- .context(ListRequestSnafu)?
- .bytes()
- .await
- .context(ListResponseBodySnafu)?;
-
- let mut response: ListResponse =
quick_xml::de::from_reader(response.reader())
- .context(InvalidListResponseSnafu)?;
-
- let token = response.next_continuation_token.take();
- Ok((response.try_into()?, token))
- }
-}
-
-struct GCSMultipartUpload {
- client: Arc<GoogleCloudStorageClient>,
- path: Path,
- multipart_id: MultipartId,
-}
-
-#[async_trait]
-impl PutPart for GCSMultipartUpload {
- /// Upload an object part
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
- async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
- let upload_id = self.multipart_id.clone();
- let content_id = self
- .client
- .put_request(
- &self.path,
- buf.into(),
- &[
- ("partNumber", format!("{}", part_idx + 1)),
- ("uploadId", upload_id),
- ],
- )
- .await?;
-
- Ok(PartId { content_id })
- }
-
- /// Complete a multipart upload
<https://cloud.google.com/storage/docs/xml-api/post-object-complete>
- async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
- let upload_id = self.multipart_id.clone();
- let url = self.client.object_url(&self.path);
-
- let parts = completed_parts
- .into_iter()
- .enumerate()
- .map(|(part_number, part)| MultipartPart {
- e_tag: part.content_id,
- part_number: part_number + 1,
- })
- .collect();
-
- let credential = self.client.get_credential().await?;
- let upload_info = CompleteMultipartUpload { parts };
-
- let data = quick_xml::se::to_string(&upload_info)
- .context(InvalidPutResponseSnafu)?
- // We cannot disable the escaping that transforms "/" to ""e;"
:(
- // https://github.com/tafia/quick-xml/issues/362
- // https://github.com/tafia/quick-xml/issues/350
- .replace(""", "\"");
-
- self.client
- .client
- .request(Method::POST, &url)
- .bearer_auth(&credential.bearer)
- .query(&[("uploadId", upload_id)])
- .body(data)
- .send_retry(&self.client.retry_config)
- .await
- .context(PostRequestSnafu {
- path: self.path.as_ref(),
- })?;
-
- Ok(())
- }
-}
-
-#[async_trait]
-impl ObjectStore for GoogleCloudStorage {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
- let e_tag = self.client.put_request(location, bytes, &()).await?;
- Ok(PutResult { e_tag: Some(e_tag) })
- }
-
- async fn put_multipart(
- &self,
- location: &Path,
- ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
- let upload_id = self.client.multipart_initiate(location).await?;
-
- let inner = GCSMultipartUpload {
- client: Arc::clone(&self.client),
- path: location.clone(),
- multipart_id: upload_id.clone(),
- };
-
- Ok((upload_id, Box::new(WriteMultiPart::new(inner, 8))))
- }
-
- async fn abort_multipart(
- &self,
- location: &Path,
- multipart_id: &MultipartId,
- ) -> Result<()> {
- self.client
- .multipart_cleanup(location.as_ref(), multipart_id)
- .await?;
-
- Ok(())
- }
-
- async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- self.client.get_opts(location, options).await
- }
-
- async fn delete(&self, location: &Path) -> Result<()> {
- self.client.delete_request(location).await
- }
-
- fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>>
{
- self.client.list(prefix)
- }
-
- async fn list_with_delimiter(&self, prefix: Option<&Path>) ->
Result<ListResult> {
- self.client.list_with_delimiter(prefix).await
- }
-
- async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, false).await
- }
-
- async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
- self.client.copy_request(from, to, true).await
- }
-}
-
/// Configure a connection to Google Cloud Storage using the specified
/// credentials.
///
@@ -697,7 +180,7 @@ impl AsRef<str> for GoogleConfigKey {
}
impl FromStr for GoogleConfigKey {
- type Err = super::Error;
+ type Err = crate::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
@@ -993,8 +476,6 @@ impl GoogleCloudStorageBuilder {
let bucket_name = self.bucket_name.ok_or(Error::MissingBucketName {})?;
- let client = self.client_options.client()?;
-
// First try to initialize from the service account information.
let service_account_credentials =
match (self.service_account_path, self.service_account_key) {
@@ -1063,140 +544,95 @@ impl GoogleCloudStorageBuilder {
)) as _
};
- let encoded_bucket_name =
- percent_encode(bucket_name.as_bytes(),
NON_ALPHANUMERIC).to_string();
+ let config = GoogleCloudStorageConfig {
+ base_url: gcs_base_url,
+ credentials,
+ bucket_name,
+ retry_config: self.retry_config,
+ client_options: self.client_options,
+ };
Ok(GoogleCloudStorage {
- client: Arc::new(GoogleCloudStorageClient {
- client,
- base_url: gcs_base_url,
- credentials,
- bucket_name,
- bucket_name_encoded: encoded_bucket_name,
- retry_config: self.retry_config,
- client_options: self.client_options,
- max_list_results: None,
- }),
+ client: Arc::new(GoogleCloudStorageClient::new(config)?),
})
}
}
#[cfg(test)]
-mod test {
- use bytes::Bytes;
+mod tests {
+ use super::*;
use std::collections::HashMap;
use std::io::Write;
use tempfile::NamedTempFile;
- use crate::tests::*;
-
- use super::*;
-
const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id":
"private_key_id", "client_email":"client_email", "disable_oauth":true}"#;
- const NON_EXISTENT_NAME: &str = "nonexistentname";
-
- #[tokio::test]
- async fn gcs_test() {
- crate::test_util::maybe_skip_integration!();
- let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
-
- put_get_delete_list(&integration).await;
- list_uses_directories_correctly(&integration).await;
- list_with_delimiter(&integration).await;
- rename_and_copy(&integration).await;
- if integration.client.base_url == DEFAULT_GCS_BASE_URL {
- // Fake GCS server doesn't currently honor ifGenerationMatch
- // https://github.com/fsouza/fake-gcs-server/issues/994
- copy_if_not_exists(&integration).await;
- // Fake GCS server does not yet implement XML Multipart uploads
- // https://github.com/fsouza/fake-gcs-server/issues/852
- stream_get(&integration).await;
- // Fake GCS server doesn't currently honor preconditions
- get_opts(&integration).await;
- }
- }
-
- #[tokio::test]
- async fn gcs_test_get_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
- let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- let err = integration.get(&location).await.unwrap_err();
- assert!(
- matches!(err, crate::Error::NotFound { .. }),
- "unexpected error type: {err}"
- );
- }
-
- #[tokio::test]
- async fn gcs_test_get_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
- let config = GoogleCloudStorageBuilder::from_env();
- let integration =
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- let err = get_nonexistent_object(&integration, Some(location))
- .await
+ #[test]
+ fn gcs_test_service_account_key_and_path() {
+ let mut tfile = NamedTempFile::new().unwrap();
+ write!(tfile, "{FAKE_KEY}").unwrap();
+ let _ = GoogleCloudStorageBuilder::new()
+ .with_service_account_key(FAKE_KEY)
+ .with_service_account_path(tfile.path().to_str().unwrap())
+ .with_bucket_name("foo")
+ .build()
.unwrap_err();
-
- assert!(
- matches!(err, crate::Error::NotFound { .. }),
- "unexpected error type: {err}"
- );
}
- #[tokio::test]
- async fn gcs_test_delete_nonexistent_location() {
- crate::test_util::maybe_skip_integration!();
- let integration =
GoogleCloudStorageBuilder::from_env().build().unwrap();
-
- let location = Path::from_iter([NON_EXISTENT_NAME]);
-
- let err = integration.delete(&location).await.unwrap_err();
- assert!(
- matches!(err, crate::Error::NotFound { .. }),
- "unexpected error type: {err}"
- );
- }
-
- #[tokio::test]
- async fn gcs_test_delete_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
- let config = GoogleCloudStorageBuilder::from_env();
- let integration =
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
+ #[test]
+ fn gcs_test_config_from_map() {
+ let google_service_account =
"object_store:fake_service_account".to_string();
+ let google_bucket_name = "object_store:fake_bucket".to_string();
+ let options = HashMap::from([
+ ("google_service_account", google_service_account.clone()),
+ ("google_bucket_name", google_bucket_name.clone()),
+ ]);
- let location = Path::from_iter([NON_EXISTENT_NAME]);
+ let builder = options
+ .iter()
+ .fold(GoogleCloudStorageBuilder::new(), |builder, (key, value)| {
+ builder.with_config(key.parse().unwrap(), value)
+ });
- let err = integration.delete(&location).await.unwrap_err();
- assert!(
- matches!(err, crate::Error::NotFound { .. }),
- "unexpected error type: {err}"
+ assert_eq!(
+ builder.service_account_path.unwrap(),
+ google_service_account.as_str()
);
+ assert_eq!(builder.bucket_name.unwrap(), google_bucket_name.as_str());
}
- #[tokio::test]
- async fn gcs_test_put_nonexistent_bucket() {
- crate::test_util::maybe_skip_integration!();
- let config = GoogleCloudStorageBuilder::from_env();
- let integration =
config.with_bucket_name(NON_EXISTENT_NAME).build().unwrap();
+ #[test]
+ fn gcs_test_config_aliases() {
+ // Service account path
+ for alias in [
+ "google_service_account",
+ "service_account",
+ "google_service_account_path",
+ "service_account_path",
+ ] {
+ let builder = GoogleCloudStorageBuilder::new()
+ .with_config(alias.parse().unwrap(), "/fake/path.json");
+ assert_eq!("/fake/path.json",
builder.service_account_path.unwrap());
+ }
- let location = Path::from_iter([NON_EXISTENT_NAME]);
- let data = Bytes::from("arbitrary data");
+ // Service account key
+ for alias in ["google_service_account_key", "service_account_key"] {
+ let builder = GoogleCloudStorageBuilder::new()
+ .with_config(alias.parse().unwrap(), FAKE_KEY);
+ assert_eq!(FAKE_KEY, builder.service_account_key.unwrap());
+ }
- let err = integration
- .put(&location, data)
- .await
- .unwrap_err()
- .to_string();
- assert!(
- err.contains("Client error with status 404 Not Found"),
- "{}",
- err
- )
+ // Bucket name
+ for alias in [
+ "google_bucket",
+ "google_bucket_name",
+ "bucket",
+ "bucket_name",
+ ] {
+ let builder = GoogleCloudStorageBuilder::new()
+ .with_config(alias.parse().unwrap(), "fake_bucket");
+ assert_eq!("fake_bucket", builder.bucket_name.unwrap());
+ }
}
#[tokio::test]
@@ -1247,40 +683,6 @@ mod test {
.unwrap();
}
- #[test]
- fn gcs_test_service_account_key_and_path() {
- let mut tfile = NamedTempFile::new().unwrap();
- write!(tfile, "{FAKE_KEY}").unwrap();
- let _ = GoogleCloudStorageBuilder::new()
- .with_service_account_key(FAKE_KEY)
- .with_service_account_path(tfile.path().to_str().unwrap())
- .with_bucket_name("foo")
- .build()
- .unwrap_err();
- }
-
- #[test]
- fn gcs_test_config_from_map() {
- let google_service_account =
"object_store:fake_service_account".to_string();
- let google_bucket_name = "object_store:fake_bucket".to_string();
- let options = HashMap::from([
- ("google_service_account", google_service_account.clone()),
- ("google_bucket_name", google_bucket_name.clone()),
- ]);
-
- let builder = options
- .iter()
- .fold(GoogleCloudStorageBuilder::new(), |builder, (key, value)| {
- builder.with_config(key.parse().unwrap(), value)
- });
-
- assert_eq!(
- builder.service_account_path.unwrap(),
- google_service_account.as_str()
- );
- assert_eq!(builder.bucket_name.unwrap(), google_bucket_name.as_str());
- }
-
#[test]
fn gcs_test_config_get_value() {
let google_service_account =
"object_store:fake_service_account".to_string();
@@ -1300,38 +702,4 @@ mod test {
google_bucket_name
);
}
-
- #[test]
- fn gcs_test_config_aliases() {
- // Service account path
- for alias in [
- "google_service_account",
- "service_account",
- "google_service_account_path",
- "service_account_path",
- ] {
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(alias.parse().unwrap(), "/fake/path.json");
- assert_eq!("/fake/path.json",
builder.service_account_path.unwrap());
- }
-
- // Service account key
- for alias in ["google_service_account_key", "service_account_key"] {
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(alias.parse().unwrap(), FAKE_KEY);
- assert_eq!(FAKE_KEY, builder.service_account_key.unwrap());
- }
-
- // Bucket name
- for alias in [
- "google_bucket",
- "google_bucket_name",
- "bucket",
- "bucket_name",
- ] {
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(alias.parse().unwrap(), "fake_bucket");
- assert_eq!("fake_bucket", builder.bucket_name.unwrap());
- }
- }
}
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
new file mode 100644
index 0000000000..9141a9da8c
--- /dev/null
+++ b/object_store/src/gcp/client.rs
@@ -0,0 +1,446 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::get::GetClient;
+use crate::client::header::get_etag;
+use crate::client::list::ListClient;
+use crate::client::list_response::ListResponse;
+use crate::client::retry::RetryExt;
+use crate::client::GetOptionsExt;
+use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
+use crate::multipart::PartId;
+use crate::path::{Path, DELIMITER};
+use crate::{ClientOptions, GetOptions, ListResult, MultipartId, Result,
RetryConfig};
+use async_trait::async_trait;
+use bytes::{Buf, Bytes};
+use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
+use reqwest::{header, Client, Method, Response, StatusCode};
+use serde::Serialize;
+use snafu::{ResultExt, Snafu};
+use std::sync::Arc;
+
+#[derive(Debug, Snafu)]
+enum Error {
+ #[snafu(display("Error performing list request: {}", source))]
+ ListRequest { source: crate::client::retry::Error },
+
+ #[snafu(display("Error getting list response body: {}", source))]
+ ListResponseBody { source: reqwest::Error },
+
+ #[snafu(display("Got invalid list response: {}", source))]
+ InvalidListResponse { source: quick_xml::de::DeError },
+
+ #[snafu(display("Error performing get request {}: {}", path, source))]
+ GetRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error performing delete request {}: {}", path, source))]
+ DeleteRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error performing put request {}: {}", path, source))]
+ PutRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Error getting put response body: {}", source))]
+ PutResponseBody { source: reqwest::Error },
+
+ #[snafu(display("Got invalid put response: {}", source))]
+ InvalidPutResponse { source: quick_xml::de::DeError },
+
+ #[snafu(display("Error performing post request {}: {}", path, source))]
+ PostRequest {
+ source: crate::client::retry::Error,
+ path: String,
+ },
+
+ #[snafu(display("Unable to extract metadata from headers: {}", source))]
+ Metadata {
+ source: crate::client::header::Error,
+ },
+}
+
+impl From<Error> for crate::Error {
+ fn from(err: Error) -> Self {
+ match err {
+ Error::GetRequest { source, path }
+ | Error::DeleteRequest { source, path }
+ | Error::PutRequest { source, path } => source.error(STORE, path),
+ _ => Self::Generic {
+ store: STORE,
+ source: Box::new(err),
+ },
+ }
+ }
+}
+
+#[derive(Debug)]
+pub struct GoogleCloudStorageConfig {
+ pub base_url: String,
+
+ pub credentials: GcpCredentialProvider,
+
+ pub bucket_name: String,
+
+ pub retry_config: RetryConfig,
+
+ pub client_options: ClientOptions,
+}
+
+#[derive(Debug)]
+pub struct GoogleCloudStorageClient {
+ config: GoogleCloudStorageConfig,
+
+ client: Client,
+
+ bucket_name_encoded: String,
+
+ // TODO: Hook this up in tests
+ max_list_results: Option<String>,
+}
+
+impl GoogleCloudStorageClient {
+ pub fn new(config: GoogleCloudStorageConfig) -> Result<Self> {
+ let client = config.client_options.client()?;
+ let bucket_name_encoded =
+ percent_encode(config.bucket_name.as_bytes(),
NON_ALPHANUMERIC).to_string();
+
+ Ok(Self {
+ config,
+ client,
+ bucket_name_encoded,
+ max_list_results: None,
+ })
+ }
+
+ pub fn config(&self) -> &GoogleCloudStorageConfig {
+ &self.config
+ }
+
+ async fn get_credential(&self) -> Result<Arc<GcpCredential>> {
+ self.config.credentials.get_credential().await
+ }
+
+ pub fn object_url(&self, path: &Path) -> String {
+ let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
+ format!(
+ "{}/{}/{}",
+ self.config.base_url, self.bucket_name_encoded, encoded
+ )
+ }
+
+ /// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
+ ///
+ /// Returns the new ETag
+ pub async fn put_request<T: Serialize + ?Sized + Sync>(
+ &self,
+ path: &Path,
+ payload: Bytes,
+ query: &T,
+ ) -> Result<String> {
+ let credential = self.get_credential().await?;
+ let url = self.object_url(path);
+
+ let content_type = self
+ .config
+ .client_options
+ .get_content_type(path)
+ .unwrap_or("application/octet-stream");
+
+ let response = self
+ .client
+ .request(Method::PUT, url)
+ .query(query)
+ .bearer_auth(&credential.bearer)
+ .header(header::CONTENT_TYPE, content_type)
+ .header(header::CONTENT_LENGTH, payload.len())
+ .body(payload)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(PutRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(get_etag(response.headers()).context(MetadataSnafu)?)
+ }
+
+ /// Initiate a multi-part upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
+ pub async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId>
{
+ let credential = self.get_credential().await?;
+ let url = self.object_url(path);
+
+ let content_type = self
+ .config
+ .client_options
+ .get_content_type(path)
+ .unwrap_or("application/octet-stream");
+
+ let response = self
+ .client
+ .request(Method::POST, &url)
+ .bearer_auth(&credential.bearer)
+ .header(header::CONTENT_TYPE, content_type)
+ .header(header::CONTENT_LENGTH, "0")
+ .query(&[("uploads", "")])
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(PutRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ let data = response.bytes().await.context(PutResponseBodySnafu)?;
+ let result: InitiateMultipartUploadResult =
+ quick_xml::de::from_reader(data.as_ref().reader())
+ .context(InvalidPutResponseSnafu)?;
+
+ Ok(result.upload_id)
+ }
+
+ /// Cleanup unused parts
<https://cloud.google.com/storage/docs/xml-api/delete-multipart>
+ pub async fn multipart_cleanup(
+ &self,
+ path: &Path,
+ multipart_id: &MultipartId,
+ ) -> Result<()> {
+ let credential = self.get_credential().await?;
+ let url = self.object_url(path);
+
+ self.client
+ .request(Method::DELETE, &url)
+ .bearer_auth(&credential.bearer)
+ .header(header::CONTENT_TYPE, "application/octet-stream")
+ .header(header::CONTENT_LENGTH, "0")
+ .query(&[("uploadId", multipart_id)])
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(PutRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ pub async fn multipart_complete(
+ &self,
+ path: &Path,
+ multipart_id: &MultipartId,
+ completed_parts: Vec<PartId>,
+ ) -> Result<()> {
+ let upload_id = multipart_id.clone();
+ let url = self.object_url(path);
+
+ let parts = completed_parts
+ .into_iter()
+ .enumerate()
+ .map(|(part_number, part)| MultipartPart {
+ e_tag: part.content_id,
+ part_number: part_number + 1,
+ })
+ .collect();
+
+ let credential = self.get_credential().await?;
+ let upload_info = CompleteMultipartUpload { parts };
+
+ let data = quick_xml::se::to_string(&upload_info)
+ .context(InvalidPutResponseSnafu)?
+ // We cannot disable the escaping that transforms "/" to ""e;"
:(
+ // https://github.com/tafia/quick-xml/issues/362
+ // https://github.com/tafia/quick-xml/issues/350
+ .replace(""", "\"");
+
+ self.client
+ .request(Method::POST, &url)
+ .bearer_auth(&credential.bearer)
+ .query(&[("uploadId", upload_id)])
+ .body(data)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(PostRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ /// Perform a delete request
<https://cloud.google.com/storage/docs/xml-api/delete-object>
+ pub async fn delete_request(&self, path: &Path) -> Result<()> {
+ let credential = self.get_credential().await?;
+ let url = self.object_url(path);
+
+ let builder = self.client.request(Method::DELETE, url);
+ builder
+ .bearer_auth(&credential.bearer)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(DeleteRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(())
+ }
+
+ /// Perform a copy request
<https://cloud.google.com/storage/docs/xml-api/put-object-copy>
+ pub async fn copy_request(
+ &self,
+ from: &Path,
+ to: &Path,
+ if_not_exists: bool,
+ ) -> Result<()> {
+ let credential = self.get_credential().await?;
+ let url = self.object_url(to);
+
+ let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
+ let source = format!("{}/{}", self.bucket_name_encoded, from);
+
+ let mut builder = self
+ .client
+ .request(Method::PUT, url)
+ .header("x-goog-copy-source", source);
+
+ if if_not_exists {
+ builder = builder.header("x-goog-if-generation-match", 0);
+ }
+
+ builder
+ .bearer_auth(&credential.bearer)
+ // Needed if reqwest is compiled with native-tls instead of
rustls-tls
+ // See https://github.com/apache/arrow-rs/pull/3921
+ .header(header::CONTENT_LENGTH, 0)
+ .send_retry(&self.config.retry_config)
+ .await
+ .map_err(|err| match err.status() {
+ Some(StatusCode::PRECONDITION_FAILED) =>
crate::Error::AlreadyExists {
+ source: Box::new(err),
+ path: to.to_string(),
+ },
+ _ => err.error(STORE, from.to_string()),
+ })?;
+
+ Ok(())
+ }
+}
+
+#[async_trait]
+impl GetClient for GoogleCloudStorageClient {
+ const STORE: &'static str = STORE;
+
+ /// Perform a get request
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
+ async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<Response> {
+ let credential = self.get_credential().await?;
+ let url = self.object_url(path);
+
+ let method = match options.head {
+ true => Method::HEAD,
+ false => Method::GET,
+ };
+
+ let mut request = self.client.request(method,
url).with_get_options(options);
+
+ if !credential.bearer.is_empty() {
+ request = request.bearer_auth(&credential.bearer);
+ }
+
+ let response = request
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(GetRequestSnafu {
+ path: path.as_ref(),
+ })?;
+
+ Ok(response)
+ }
+}
+
+#[async_trait]
+impl ListClient for GoogleCloudStorageClient {
+ /// Perform a list request
<https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
+ async fn list_request(
+ &self,
+ prefix: Option<&str>,
+ delimiter: bool,
+ page_token: Option<&str>,
+ offset: Option<&str>,
+ ) -> Result<(ListResult, Option<String>)> {
+ assert!(offset.is_none()); // Not yet supported
+
+ let credential = self.get_credential().await?;
+ let url = format!("{}/{}", self.config.base_url,
self.bucket_name_encoded);
+
+ let mut query = Vec::with_capacity(5);
+ query.push(("list-type", "2"));
+ if delimiter {
+ query.push(("delimiter", DELIMITER))
+ }
+
+ if let Some(prefix) = &prefix {
+ query.push(("prefix", prefix))
+ }
+
+ if let Some(page_token) = page_token {
+ query.push(("continuation-token", page_token))
+ }
+
+ if let Some(max_results) = &self.max_list_results {
+ query.push(("max-keys", max_results))
+ }
+
+ let response = self
+ .client
+ .request(Method::GET, url)
+ .query(&query)
+ .bearer_auth(&credential.bearer)
+ .send_retry(&self.config.retry_config)
+ .await
+ .context(ListRequestSnafu)?
+ .bytes()
+ .await
+ .context(ListResponseBodySnafu)?;
+
+ let mut response: ListResponse =
quick_xml::de::from_reader(response.reader())
+ .context(InvalidListResponseSnafu)?;
+
+ let token = response.next_continuation_token.take();
+ Ok((response.try_into()?, token))
+ }
+}
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+struct InitiateMultipartUploadResult {
+ upload_id: String,
+}
+
+#[derive(serde::Serialize, Debug)]
+#[serde(rename_all = "PascalCase", rename(serialize = "Part"))]
+struct MultipartPart {
+ #[serde(rename = "PartNumber")]
+ part_number: usize,
+ e_tag: String,
+}
+
+#[derive(serde::Serialize, Debug)]
+#[serde(rename_all = "PascalCase")]
+struct CompleteMultipartUpload {
+ #[serde(rename = "Part", default)]
+ parts: Vec<MultipartPart>,
+}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 97755c07c6..7c69d28874 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -29,176 +29,34 @@
//! to abort the upload and drop those unneeded parts. In addition, you may
wish to
//! consider implementing automatic clean up of unused parts that are older
than one
//! week.
-use std::str::FromStr;
use std::sync::Arc;
-use async_trait::async_trait;
-use bytes::{Buf, Bytes};
-use futures::stream::BoxStream;
-use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
-use reqwest::{header, Client, Method, Response, StatusCode};
-use serde::{Deserialize, Serialize};
-use snafu::{OptionExt, ResultExt, Snafu};
-use tokio::io::AsyncWrite;
-use url::Url;
-
-use crate::client::get::{GetClient, GetClientExt};
-use crate::client::list::{ListClient, ListClientExt};
-use crate::client::list_response::ListResponse;
-use crate::client::retry::RetryExt;
-use crate::client::{
- ClientConfigKey, CredentialProvider, GetOptionsExt,
StaticCredentialProvider,
- TokenCredentialProvider,
-};
+use crate::client::CredentialProvider;
use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
- path::{Path, DELIMITER},
- ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, PutResult, Result, RetryConfig,
+ path::Path,
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutResult,
+ Result,
};
+use async_trait::async_trait;
+use bytes::Bytes;
+use client::GoogleCloudStorageClient;
+use futures::stream::BoxStream;
+use tokio::io::AsyncWrite;
-use credential::{InstanceCredentialProvider, ServiceAccountCredentials};
+use crate::client::get::GetClientExt;
+use crate::client::list::ListClientExt;
+pub use builder::{GoogleCloudStorageBuilder, GoogleConfigKey};
+pub use credential::GcpCredential;
+mod builder;
+mod client;
mod credential;
const STORE: &str = "GCS";
/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential =
GcpCredential>>;
-use crate::client::header::get_etag;
-use crate::gcp::credential::{ApplicationDefaultCredentials,
DEFAULT_GCS_BASE_URL};
-pub use credential::GcpCredential;
-
-#[derive(Debug, Snafu)]
-enum Error {
- #[snafu(display("Got invalid XML response for {} {}: {}", method, url,
source))]
- InvalidXMLResponse {
- source: quick_xml::de::DeError,
- method: String,
- url: String,
- data: Bytes,
- },
-
- #[snafu(display("Error performing list request: {}", source))]
- ListRequest { source: crate::client::retry::Error },
-
- #[snafu(display("Error getting list response body: {}", source))]
- ListResponseBody { source: reqwest::Error },
-
- #[snafu(display("Got invalid list response: {}", source))]
- InvalidListResponse { source: quick_xml::de::DeError },
-
- #[snafu(display("Error performing get request {}: {}", path, source))]
- GetRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error getting get response body {}: {}", path, source))]
- GetResponseBody {
- source: reqwest::Error,
- path: String,
- },
-
- #[snafu(display("Error performing delete request {}: {}", path, source))]
- DeleteRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error performing put request {}: {}", path, source))]
- PutRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error getting put response body: {}", source))]
- PutResponseBody { source: reqwest::Error },
-
- #[snafu(display("Got invalid put response: {}", source))]
- InvalidPutResponse { source: quick_xml::de::DeError },
-
- #[snafu(display("Error performing post request {}: {}", path, source))]
- PostRequest {
- source: crate::client::retry::Error,
- path: String,
- },
-
- #[snafu(display("Error decoding object size: {}", source))]
- InvalidSize { source: std::num::ParseIntError },
-
- #[snafu(display("Missing bucket name"))]
- MissingBucketName {},
-
- #[snafu(display(
- "One of service account path or service account key may be provided."
- ))]
- ServiceAccountPathAndKeyProvided,
-
- #[snafu(display("GCP credential error: {}", source))]
- Credential { source: credential::Error },
-
- #[snafu(display("Unable parse source url. Url: {}, Error: {}", url,
source))]
- UnableToParseUrl {
- source: url::ParseError,
- url: String,
- },
-
- #[snafu(display(
- "Unknown url scheme cannot be parsed into storage location: {}",
- scheme
- ))]
- UnknownUrlScheme { scheme: String },
-
- #[snafu(display("URL did not match any known pattern for scheme: {}",
url))]
- UrlNotRecognised { url: String },
-
- #[snafu(display("Configuration key: '{}' is not known.", key))]
- UnknownConfigurationKey { key: String },
-
- #[snafu(display("Unable to extract metadata from headers: {}", source))]
- Metadata {
- source: crate::client::header::Error,
- },
-}
-
-impl From<Error> for super::Error {
- fn from(err: Error) -> Self {
- match err {
- Error::GetRequest { source, path }
- | Error::DeleteRequest { source, path }
- | Error::PutRequest { source, path } => source.error(STORE, path),
- Error::UnknownConfigurationKey { key } => {
- Self::UnknownConfigurationKey { store: STORE, key }
- }
- _ => Self::Generic {
- store: STORE,
- source: Box::new(err),
- },
- }
- }
-}
-
-#[derive(serde::Deserialize, Debug)]
-#[serde(rename_all = "PascalCase")]
-struct InitiateMultipartUploadResult {
- upload_id: String,
-}
-
-#[derive(serde::Serialize, Debug)]
-#[serde(rename_all = "PascalCase", rename(serialize = "Part"))]
-struct MultipartPart {
- #[serde(rename = "PartNumber")]
- part_number: usize,
- e_tag: String,
-}
-
-#[derive(serde::Serialize, Debug)]
-#[serde(rename_all = "PascalCase")]
-struct CompleteMultipartUpload {
- #[serde(rename = "Part", default)]
- parts: Vec<MultipartPart>,
-}
/// Interface for [Google Cloud Storage](https://cloud.google.com/storage/).
#[derive(Debug)]
@@ -208,271 +66,18 @@ pub struct GoogleCloudStorage {
impl std::fmt::Display for GoogleCloudStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "GoogleCloudStorage({})", self.client.bucket_name)
+ write!(
+ f,
+ "GoogleCloudStorage({})",
+ self.client.config().bucket_name
+ )
}
}
impl GoogleCloudStorage {
/// Returns the [`GcpCredentialProvider`] used by [`GoogleCloudStorage`]
pub fn credentials(&self) -> &GcpCredentialProvider {
- &self.client.credentials
- }
-}
-
-#[derive(Debug)]
-struct GoogleCloudStorageClient {
- client: Client,
- base_url: String,
-
- credentials: GcpCredentialProvider,
-
- bucket_name: String,
- bucket_name_encoded: String,
-
- retry_config: RetryConfig,
- client_options: ClientOptions,
-
- // TODO: Hook this up in tests
- max_list_results: Option<String>,
-}
-
-impl GoogleCloudStorageClient {
- async fn get_credential(&self) -> Result<Arc<GcpCredential>> {
- self.credentials.get_credential().await
- }
-
- fn object_url(&self, path: &Path) -> String {
- let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
- format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded)
- }
-
- /// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
- ///
- /// Returns the new ETag
- async fn put_request<T: Serialize + ?Sized + Sync>(
- &self,
- path: &Path,
- payload: Bytes,
- query: &T,
- ) -> Result<String> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let content_type = self
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
-
- let response = self
- .client
- .request(Method::PUT, url)
- .query(query)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, content_type)
- .header(header::CONTENT_LENGTH, payload.len())
- .body(payload)
- .send_retry(&self.retry_config)
- .await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(get_etag(response.headers()).context(MetadataSnafu)?)
- }
-
- /// Initiate a multi-part upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
- async fn multipart_initiate(&self, path: &Path) -> Result<MultipartId> {
- let credential = self.get_credential().await?;
- let url = format!("{}/{}/{}", self.base_url, self.bucket_name_encoded,
path);
-
- let content_type = self
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
-
- let response = self
- .client
- .request(Method::POST, &url)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, content_type)
- .header(header::CONTENT_LENGTH, "0")
- .query(&[("uploads", "")])
- .send_retry(&self.retry_config)
- .await
- .context(PutRequestSnafu {
- path: path.as_ref(),
- })?;
-
- let data = response.bytes().await.context(PutResponseBodySnafu)?;
- let result: InitiateMultipartUploadResult =
- quick_xml::de::from_reader(data.as_ref().reader())
- .context(InvalidPutResponseSnafu)?;
-
- Ok(result.upload_id)
- }
-
- /// Cleanup unused parts
<https://cloud.google.com/storage/docs/xml-api/delete-multipart>
- async fn multipart_cleanup(
- &self,
- path: &str,
- multipart_id: &MultipartId,
- ) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = format!("{}/{}/{}", self.base_url, self.bucket_name_encoded,
path);
-
- self.client
- .request(Method::DELETE, &url)
- .bearer_auth(&credential.bearer)
- .header(header::CONTENT_TYPE, "application/octet-stream")
- .header(header::CONTENT_LENGTH, "0")
- .query(&[("uploadId", multipart_id)])
- .send_retry(&self.retry_config)
- .await
- .context(PutRequestSnafu { path })?;
-
- Ok(())
- }
-
- /// Perform a delete request
<https://cloud.google.com/storage/docs/xml-api/delete-object>
- async fn delete_request(&self, path: &Path) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let builder = self.client.request(Method::DELETE, url);
- builder
- .bearer_auth(&credential.bearer)
- .send_retry(&self.retry_config)
- .await
- .context(DeleteRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(())
- }
-
- /// Perform a copy request
<https://cloud.google.com/storage/docs/xml-api/put-object-copy>
- async fn copy_request(
- &self,
- from: &Path,
- to: &Path,
- if_not_exists: bool,
- ) -> Result<()> {
- let credential = self.get_credential().await?;
- let url = self.object_url(to);
-
- let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
- let source = format!("{}/{}", self.bucket_name_encoded, from);
-
- let mut builder = self
- .client
- .request(Method::PUT, url)
- .header("x-goog-copy-source", source);
-
- if if_not_exists {
- builder = builder.header("x-goog-if-generation-match", 0);
- }
-
- builder
- .bearer_auth(&credential.bearer)
- // Needed if reqwest is compiled with native-tls instead of
rustls-tls
- // See https://github.com/apache/arrow-rs/pull/3921
- .header(header::CONTENT_LENGTH, 0)
- .send_retry(&self.retry_config)
- .await
- .map_err(|err| match err.status() {
- Some(StatusCode::PRECONDITION_FAILED) =>
crate::Error::AlreadyExists {
- source: Box::new(err),
- path: to.to_string(),
- },
- _ => err.error(STORE, from.to_string()),
- })?;
-
- Ok(())
- }
-}
-
-#[async_trait]
-impl GetClient for GoogleCloudStorageClient {
- const STORE: &'static str = STORE;
-
- /// Perform a get request
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
- async fn get_request(&self, path: &Path, options: GetOptions) ->
Result<Response> {
- let credential = self.get_credential().await?;
- let url = self.object_url(path);
-
- let method = match options.head {
- true => Method::HEAD,
- false => Method::GET,
- };
-
- let mut request = self.client.request(method,
url).with_get_options(options);
-
- if !credential.bearer.is_empty() {
- request = request.bearer_auth(&credential.bearer);
- }
-
- let response =
- request
- .send_retry(&self.retry_config)
- .await
- .context(GetRequestSnafu {
- path: path.as_ref(),
- })?;
-
- Ok(response)
- }
-}
-
-#[async_trait]
-impl ListClient for GoogleCloudStorageClient {
- /// Perform a list request
<https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
- async fn list_request(
- &self,
- prefix: Option<&str>,
- delimiter: bool,
- page_token: Option<&str>,
- offset: Option<&str>,
- ) -> Result<(ListResult, Option<String>)> {
- assert!(offset.is_none()); // Not yet supported
-
- let credential = self.get_credential().await?;
- let url = format!("{}/{}", self.base_url, self.bucket_name_encoded);
-
- let mut query = Vec::with_capacity(5);
- query.push(("list-type", "2"));
- if delimiter {
- query.push(("delimiter", DELIMITER))
- }
-
- if let Some(prefix) = &prefix {
- query.push(("prefix", prefix))
- }
-
- if let Some(page_token) = page_token {
- query.push(("continuation-token", page_token))
- }
-
- if let Some(max_results) = &self.max_list_results {
- query.push(("max-keys", max_results))
- }
-
- let response = self
- .client
- .request(Method::GET, url)
- .query(&query)
- .bearer_auth(&credential.bearer)
- .send_retry(&self.retry_config)
- .await
- .context(ListRequestSnafu)?
- .bytes()
- .await
- .context(ListResponseBodySnafu)?;
-
- let mut response: ListResponse =
quick_xml::de::from_reader(response.reader())
- .context(InvalidListResponseSnafu)?;
-
- let token = response.next_continuation_token.take();
- Ok((response.try_into()?, token))
+ &self.client.config().credentials
}
}
@@ -504,41 +109,9 @@ impl PutPart for GCSMultipartUpload {
/// Complete a multipart upload
<https://cloud.google.com/storage/docs/xml-api/post-object-complete>
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
- let upload_id = self.multipart_id.clone();
- let url = self.client.object_url(&self.path);
-
- let parts = completed_parts
- .into_iter()
- .enumerate()
- .map(|(part_number, part)| MultipartPart {
- e_tag: part.content_id,
- part_number: part_number + 1,
- })
- .collect();
-
- let credential = self.client.get_credential().await?;
- let upload_info = CompleteMultipartUpload { parts };
-
- let data = quick_xml::se::to_string(&upload_info)
- .context(InvalidPutResponseSnafu)?
- // We cannot disable the escaping that transforms "/" to ""e;"
:(
- // https://github.com/tafia/quick-xml/issues/362
- // https://github.com/tafia/quick-xml/issues/350
- .replace(""", "\"");
-
self.client
- .client
- .request(Method::POST, &url)
- .bearer_auth(&credential.bearer)
- .query(&[("uploadId", upload_id)])
- .body(data)
- .send_retry(&self.client.retry_config)
+ .multipart_complete(&self.path, &self.multipart_id,
completed_parts)
.await
- .context(PostRequestSnafu {
- path: self.path.as_ref(),
- })?;
-
- Ok(())
}
}
@@ -570,7 +143,7 @@ impl ObjectStore for GoogleCloudStorage {
multipart_id: &MultipartId,
) -> Result<()> {
self.client
- .multipart_cleanup(location.as_ref(), multipart_id)
+ .multipart_cleanup(location, multipart_id)
.await?;
Ok(())
@@ -601,498 +174,16 @@ impl ObjectStore for GoogleCloudStorage {
}
}
-/// Configure a connection to Google Cloud Storage using the specified
-/// credentials.
-///
-/// # Example
-/// ```
-/// # let BUCKET_NAME = "foo";
-/// # let SERVICE_ACCOUNT_PATH = "/tmp/foo.json";
-/// # use object_store::gcp::GoogleCloudStorageBuilder;
-/// let gcs = GoogleCloudStorageBuilder::new()
-/// .with_service_account_path(SERVICE_ACCOUNT_PATH)
-/// .with_bucket_name(BUCKET_NAME)
-/// .build();
-/// ```
-#[derive(Debug, Clone)]
-pub struct GoogleCloudStorageBuilder {
- /// Bucket name
- bucket_name: Option<String>,
- /// Url
- url: Option<String>,
- /// Path to the service account file
- service_account_path: Option<String>,
- /// The serialized service account key
- service_account_key: Option<String>,
- /// Path to the application credentials file.
- application_credentials_path: Option<String>,
- /// Retry config
- retry_config: RetryConfig,
- /// Client options
- client_options: ClientOptions,
- /// Credentials
- credentials: Option<GcpCredentialProvider>,
-}
-
-/// Configuration keys for [`GoogleCloudStorageBuilder`]
-///
-/// Configuration via keys can be done via
[`GoogleCloudStorageBuilder::with_config`]
-///
-/// # Example
-/// ```
-/// # use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
-/// let builder = GoogleCloudStorageBuilder::new()
-/// .with_config("google_service_account".parse().unwrap(),
"my-service-account")
-/// .with_config(GoogleConfigKey::Bucket, "my-bucket");
-/// ```
-#[derive(PartialEq, Eq, Hash, Clone, Debug, Copy, Serialize, Deserialize)]
-#[non_exhaustive]
-pub enum GoogleConfigKey {
- /// Path to the service account file
- ///
- /// Supported keys:
- /// - `google_service_account`
- /// - `service_account`
- /// - `google_service_account_path`
- /// - `service_account_path`
- ServiceAccount,
-
- /// The serialized service account key.
- ///
- /// Supported keys:
- /// - `google_service_account_key`
- /// - `service_account_key`
- ServiceAccountKey,
-
- /// Bucket name
- ///
- /// See [`GoogleCloudStorageBuilder::with_bucket_name`] for details.
- ///
- /// Supported keys:
- /// - `google_bucket`
- /// - `google_bucket_name`
- /// - `bucket`
- /// - `bucket_name`
- Bucket,
-
- /// Application credentials path
- ///
- /// See [`GoogleCloudStorageBuilder::with_application_credentials`].
- ApplicationCredentials,
-
- /// Client options
- Client(ClientConfigKey),
-}
-
-impl AsRef<str> for GoogleConfigKey {
- fn as_ref(&self) -> &str {
- match self {
- Self::ServiceAccount => "google_service_account",
- Self::ServiceAccountKey => "google_service_account_key",
- Self::Bucket => "google_bucket",
- Self::ApplicationCredentials => "google_application_credentials",
- Self::Client(key) => key.as_ref(),
- }
- }
-}
-
-impl FromStr for GoogleConfigKey {
- type Err = super::Error;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s {
- "google_service_account"
- | "service_account"
- | "google_service_account_path"
- | "service_account_path" => Ok(Self::ServiceAccount),
- "google_service_account_key" | "service_account_key" => {
- Ok(Self::ServiceAccountKey)
- }
- "google_bucket" | "google_bucket_name" | "bucket" | "bucket_name"
=> {
- Ok(Self::Bucket)
- }
- "google_application_credentials" =>
Ok(Self::ApplicationCredentials),
- _ => match s.parse() {
- Ok(key) => Ok(Self::Client(key)),
- Err(_) => Err(Error::UnknownConfigurationKey { key: s.into()
}.into()),
- },
- }
- }
-}
-
-impl Default for GoogleCloudStorageBuilder {
- fn default() -> Self {
- Self {
- bucket_name: None,
- service_account_path: None,
- service_account_key: None,
- application_credentials_path: None,
- retry_config: Default::default(),
- client_options: ClientOptions::new().with_allow_http(true),
- url: None,
- credentials: None,
- }
- }
-}
-
-impl GoogleCloudStorageBuilder {
- /// Create a new [`GoogleCloudStorageBuilder`] with default values.
- pub fn new() -> Self {
- Default::default()
- }
-
- /// Create an instance of [`GoogleCloudStorageBuilder`] with values
pre-populated from environment variables.
- ///
- /// Variables extracted from environment:
- /// * GOOGLE_SERVICE_ACCOUNT: location of service account file
- /// * GOOGLE_SERVICE_ACCOUNT_PATH: (alias) location of service account file
- /// * SERVICE_ACCOUNT: (alias) location of service account file
- /// * GOOGLE_SERVICE_ACCOUNT_KEY: JSON serialized service account key
- /// * GOOGLE_BUCKET: bucket name
- /// * GOOGLE_BUCKET_NAME: (alias) bucket name
- ///
- /// # Example
- /// ```
- /// use object_store::gcp::GoogleCloudStorageBuilder;
- ///
- /// let gcs = GoogleCloudStorageBuilder::from_env()
- /// .with_bucket_name("foo")
- /// .build();
- /// ```
- pub fn from_env() -> Self {
- let mut builder = Self::default();
-
- if let Ok(service_account_path) = std::env::var("SERVICE_ACCOUNT") {
- builder.service_account_path = Some(service_account_path);
- }
-
- for (os_key, os_value) in std::env::vars_os() {
- if let (Some(key), Some(value)) = (os_key.to_str(),
os_value.to_str()) {
- if key.starts_with("GOOGLE_") {
- if let Ok(config_key) = key.to_ascii_lowercase().parse() {
- builder = builder.with_config(config_key, value);
- }
- }
- }
- }
-
- builder
- }
-
- /// Parse available connection info form a well-known storage URL.
- ///
- /// The supported url schemes are:
- ///
- /// - `gs://<bucket>/<path>`
- ///
- /// Note: Settings derived from the URL will override any others set on
this builder
- ///
- /// # Example
- /// ```
- /// use object_store::gcp::GoogleCloudStorageBuilder;
- ///
- /// let gcs = GoogleCloudStorageBuilder::from_env()
- /// .with_url("gs://bucket/path")
- /// .build();
- /// ```
- pub fn with_url(mut self, url: impl Into<String>) -> Self {
- self.url = Some(url.into());
- self
- }
-
- /// Set an option on the builder via a key - value pair.
- pub fn with_config(mut self, key: GoogleConfigKey, value: impl
Into<String>) -> Self {
- match key {
- GoogleConfigKey::ServiceAccount => {
- self.service_account_path = Some(value.into())
- }
- GoogleConfigKey::ServiceAccountKey => {
- self.service_account_key = Some(value.into())
- }
- GoogleConfigKey::Bucket => self.bucket_name = Some(value.into()),
- GoogleConfigKey::ApplicationCredentials => {
- self.application_credentials_path = Some(value.into())
- }
- GoogleConfigKey::Client(key) => {
- self.client_options = self.client_options.with_config(key,
value)
- }
- };
- self
- }
-
- /// Set an option on the builder via a key - value pair.
- #[deprecated(note = "Use with_config")]
- pub fn try_with_option(
- self,
- key: impl AsRef<str>,
- value: impl Into<String>,
- ) -> Result<Self> {
- Ok(self.with_config(key.as_ref().parse()?, value))
- }
-
- /// Hydrate builder from key value pairs
- #[deprecated(note = "Use with_config")]
- #[allow(deprecated)]
- pub fn try_with_options<
- I: IntoIterator<Item = (impl AsRef<str>, impl Into<String>)>,
- >(
- mut self,
- options: I,
- ) -> Result<Self> {
- for (key, value) in options {
- self = self.try_with_option(key, value)?;
- }
- Ok(self)
- }
-
- /// Get config value via a [`GoogleConfigKey`].
- ///
- /// # Example
- /// ```
- /// use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey};
- ///
- /// let builder = GoogleCloudStorageBuilder::from_env()
- /// .with_service_account_key("foo");
- /// let service_account_key =
builder.get_config_value(&GoogleConfigKey::ServiceAccountKey).unwrap_or_default();
- /// assert_eq!("foo", &service_account_key);
- /// ```
- pub fn get_config_value(&self, key: &GoogleConfigKey) -> Option<String> {
- match key {
- GoogleConfigKey::ServiceAccount =>
self.service_account_path.clone(),
- GoogleConfigKey::ServiceAccountKey =>
self.service_account_key.clone(),
- GoogleConfigKey::Bucket => self.bucket_name.clone(),
- GoogleConfigKey::ApplicationCredentials => {
- self.application_credentials_path.clone()
- }
- GoogleConfigKey::Client(key) =>
self.client_options.get_config_value(key),
- }
- }
-
- /// Sets properties on this builder based on a URL
- ///
- /// This is a separate member function to allow fallible computation to
- /// be deferred until [`Self::build`] which in turn allows deriving
[`Clone`]
- fn parse_url(&mut self, url: &str) -> Result<()> {
- let parsed = Url::parse(url).context(UnableToParseUrlSnafu { url })?;
- let host = parsed.host_str().context(UrlNotRecognisedSnafu { url })?;
-
- let validate = |s: &str| match s.contains('.') {
- true => Err(UrlNotRecognisedSnafu { url }.build()),
- false => Ok(s.to_string()),
- };
-
- match parsed.scheme() {
- "gs" => self.bucket_name = Some(validate(host)?),
- scheme => return Err(UnknownUrlSchemeSnafu { scheme
}.build().into()),
- }
- Ok(())
- }
-
- /// Set the bucket name (required)
- pub fn with_bucket_name(mut self, bucket_name: impl Into<String>) -> Self {
- self.bucket_name = Some(bucket_name.into());
- self
- }
-
- /// Set the path to the service account file.
- ///
- /// This or [`GoogleCloudStorageBuilder::with_service_account_key`] must be
- /// set.
- ///
- /// Example `"/tmp/gcs.json"`.
- ///
- /// Example contents of `gcs.json`:
- ///
- /// ```json
- /// {
- /// "gcs_base_url": "https://localhost:4443",
- /// "disable_oauth": true,
- /// "client_email": "",
- /// "private_key": ""
- /// }
- /// ```
- pub fn with_service_account_path(
- mut self,
- service_account_path: impl Into<String>,
- ) -> Self {
- self.service_account_path = Some(service_account_path.into());
- self
- }
-
- /// Set the service account key. The service account must be in the JSON
- /// format.
- ///
- /// This or [`GoogleCloudStorageBuilder::with_service_account_path`] must
be
- /// set.
- pub fn with_service_account_key(
- mut self,
- service_account: impl Into<String>,
- ) -> Self {
- self.service_account_key = Some(service_account.into());
- self
- }
-
- /// Set the path to the application credentials file.
- ///
- /// <https://cloud.google.com/docs/authentication/provide-credentials-adc>
- pub fn with_application_credentials(
- mut self,
- application_credentials_path: impl Into<String>,
- ) -> Self {
- self.application_credentials_path =
Some(application_credentials_path.into());
- self
- }
-
- /// Set the credential provider overriding any other options
- pub fn with_credentials(mut self, credentials: GcpCredentialProvider) ->
Self {
- self.credentials = Some(credentials);
- self
- }
-
- /// Set the retry configuration
- pub fn with_retry(mut self, retry_config: RetryConfig) -> Self {
- self.retry_config = retry_config;
- self
- }
-
- /// Set the proxy_url to be used by the underlying client
- pub fn with_proxy_url(mut self, proxy_url: impl Into<String>) -> Self {
- self.client_options = self.client_options.with_proxy_url(proxy_url);
- self
- }
-
- /// Set a trusted proxy CA certificate
- pub fn with_proxy_ca_certificate(
- mut self,
- proxy_ca_certificate: impl Into<String>,
- ) -> Self {
- self.client_options = self
- .client_options
- .with_proxy_ca_certificate(proxy_ca_certificate);
- self
- }
-
- /// Set a list of hosts to exclude from proxy connections
- pub fn with_proxy_excludes(mut self, proxy_excludes: impl Into<String>) ->
Self {
- self.client_options =
self.client_options.with_proxy_excludes(proxy_excludes);
- self
- }
-
- /// Sets the client options, overriding any already set
- pub fn with_client_options(mut self, options: ClientOptions) -> Self {
- self.client_options = options;
- self
- }
-
- /// Configure a connection to Google Cloud Storage, returning a
- /// new [`GoogleCloudStorage`] and consuming `self`
- pub fn build(mut self) -> Result<GoogleCloudStorage> {
- if let Some(url) = self.url.take() {
- self.parse_url(&url)?;
- }
-
- let bucket_name = self.bucket_name.ok_or(Error::MissingBucketName {})?;
-
- let client = self.client_options.client()?;
-
- // First try to initialize from the service account information.
- let service_account_credentials =
- match (self.service_account_path, self.service_account_key) {
- (Some(path), None) => Some(
- ServiceAccountCredentials::from_file(path)
- .context(CredentialSnafu)?,
- ),
- (None, Some(key)) => Some(
-
ServiceAccountCredentials::from_key(&key).context(CredentialSnafu)?,
- ),
- (None, None) => None,
- (Some(_), Some(_)) => {
- return Err(Error::ServiceAccountPathAndKeyProvided.into())
- }
- };
-
- // Then try to initialize from the application credentials file, or
the environment.
- let application_default_credentials =
ApplicationDefaultCredentials::read(
- self.application_credentials_path.as_deref(),
- )?;
-
- let disable_oauth = service_account_credentials
- .as_ref()
- .map(|c| c.disable_oauth)
- .unwrap_or(false);
-
- let gcs_base_url: String = service_account_credentials
- .as_ref()
- .and_then(|c| c.gcs_base_url.clone())
- .unwrap_or_else(|| DEFAULT_GCS_BASE_URL.to_string());
-
- let credentials = if let Some(credentials) = self.credentials {
- credentials
- } else if disable_oauth {
- Arc::new(StaticCredentialProvider::new(GcpCredential {
- bearer: "".to_string(),
- })) as _
- } else if let Some(credentials) = service_account_credentials {
- Arc::new(TokenCredentialProvider::new(
- credentials.token_provider()?,
- self.client_options.client()?,
- self.retry_config.clone(),
- )) as _
- } else if let Some(credentials) = application_default_credentials {
- match credentials {
- ApplicationDefaultCredentials::AuthorizedUser(token) => {
- Arc::new(TokenCredentialProvider::new(
- token,
- self.client_options.client()?,
- self.retry_config.clone(),
- )) as _
- }
- ApplicationDefaultCredentials::ServiceAccount(token) => {
- Arc::new(TokenCredentialProvider::new(
- token.token_provider()?,
- self.client_options.client()?,
- self.retry_config.clone(),
- )) as _
- }
- }
- } else {
- Arc::new(TokenCredentialProvider::new(
- InstanceCredentialProvider::default(),
- self.client_options.metadata_client()?,
- self.retry_config.clone(),
- )) as _
- };
-
- let encoded_bucket_name =
- percent_encode(bucket_name.as_bytes(),
NON_ALPHANUMERIC).to_string();
-
- Ok(GoogleCloudStorage {
- client: Arc::new(GoogleCloudStorageClient {
- client,
- base_url: gcs_base_url,
- credentials,
- bucket_name,
- bucket_name_encoded: encoded_bucket_name,
- retry_config: self.retry_config,
- client_options: self.client_options,
- max_list_results: None,
- }),
- })
- }
-}
-
#[cfg(test)]
mod test {
+
use bytes::Bytes;
- use std::collections::HashMap;
- use std::io::Write;
- use tempfile::NamedTempFile;
+ use credential::DEFAULT_GCS_BASE_URL;
use crate::tests::*;
use super::*;
- const FAKE_KEY: &str = r#"{"private_key": "private_key", "private_key_id":
"private_key_id", "client_email":"client_email", "disable_oauth":true}"#;
const NON_EXISTENT_NAME: &str = "nonexistentname";
#[tokio::test]
@@ -1104,7 +195,7 @@ mod test {
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
- if integration.client.base_url == DEFAULT_GCS_BASE_URL {
+ if integration.client.config().base_url == DEFAULT_GCS_BASE_URL {
// Fake GCS server doesn't currently honor ifGenerationMatch
// https://github.com/fsouza/fake-gcs-server/issues/994
copy_if_not_exists(&integration).await;
@@ -1198,140 +289,4 @@ mod test {
err
)
}
-
- #[tokio::test]
- async fn gcs_test_proxy_url() {
- let mut tfile = NamedTempFile::new().unwrap();
- write!(tfile, "{FAKE_KEY}").unwrap();
- let service_account_path = tfile.path();
- let gcs = GoogleCloudStorageBuilder::new()
- .with_service_account_path(service_account_path.to_str().unwrap())
- .with_bucket_name("foo")
- .with_proxy_url("https://example.com")
- .build();
- assert!(dbg!(gcs).is_ok());
-
- let err = GoogleCloudStorageBuilder::new()
- .with_service_account_path(service_account_path.to_str().unwrap())
- .with_bucket_name("foo")
- .with_proxy_url("asdf://example.com")
- .build()
- .unwrap_err()
- .to_string();
-
- assert_eq!(
- "Generic HTTP client error: builder error: unknown proxy scheme",
- err
- );
- }
-
- #[test]
- fn gcs_test_urls() {
- let mut builder = GoogleCloudStorageBuilder::new();
- builder.parse_url("gs://bucket/path").unwrap();
- assert_eq!(builder.bucket_name, Some("bucket".to_string()));
-
- let err_cases = ["mailto://bucket/path", "gs://bucket.mydomain/path"];
- let mut builder = GoogleCloudStorageBuilder::new();
- for case in err_cases {
- builder.parse_url(case).unwrap_err();
- }
- }
-
- #[test]
- fn gcs_test_service_account_key_only() {
- let _ = GoogleCloudStorageBuilder::new()
- .with_service_account_key(FAKE_KEY)
- .with_bucket_name("foo")
- .build()
- .unwrap();
- }
-
- #[test]
- fn gcs_test_service_account_key_and_path() {
- let mut tfile = NamedTempFile::new().unwrap();
- write!(tfile, "{FAKE_KEY}").unwrap();
- let _ = GoogleCloudStorageBuilder::new()
- .with_service_account_key(FAKE_KEY)
- .with_service_account_path(tfile.path().to_str().unwrap())
- .with_bucket_name("foo")
- .build()
- .unwrap_err();
- }
-
- #[test]
- fn gcs_test_config_from_map() {
- let google_service_account =
"object_store:fake_service_account".to_string();
- let google_bucket_name = "object_store:fake_bucket".to_string();
- let options = HashMap::from([
- ("google_service_account", google_service_account.clone()),
- ("google_bucket_name", google_bucket_name.clone()),
- ]);
-
- let builder = options
- .iter()
- .fold(GoogleCloudStorageBuilder::new(), |builder, (key, value)| {
- builder.with_config(key.parse().unwrap(), value)
- });
-
- assert_eq!(
- builder.service_account_path.unwrap(),
- google_service_account.as_str()
- );
- assert_eq!(builder.bucket_name.unwrap(), google_bucket_name.as_str());
- }
-
- #[test]
- fn gcs_test_config_get_value() {
- let google_service_account =
"object_store:fake_service_account".to_string();
- let google_bucket_name = "object_store:fake_bucket".to_string();
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(GoogleConfigKey::ServiceAccount,
&google_service_account)
- .with_config(GoogleConfigKey::Bucket, &google_bucket_name);
-
- assert_eq!(
- builder
- .get_config_value(&GoogleConfigKey::ServiceAccount)
- .unwrap(),
- google_service_account
- );
- assert_eq!(
- builder.get_config_value(&GoogleConfigKey::Bucket).unwrap(),
- google_bucket_name
- );
- }
-
- #[test]
- fn gcs_test_config_aliases() {
- // Service account path
- for alias in [
- "google_service_account",
- "service_account",
- "google_service_account_path",
- "service_account_path",
- ] {
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(alias.parse().unwrap(), "/fake/path.json");
- assert_eq!("/fake/path.json",
builder.service_account_path.unwrap());
- }
-
- // Service account key
- for alias in ["google_service_account_key", "service_account_key"] {
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(alias.parse().unwrap(), FAKE_KEY);
- assert_eq!(FAKE_KEY, builder.service_account_key.unwrap());
- }
-
- // Bucket name
- for alias in [
- "google_bucket",
- "google_bucket_name",
- "bucket",
- "bucket_name",
- ] {
- let builder = GoogleCloudStorageBuilder::new()
- .with_config(alias.parse().unwrap(), "fake_bucket");
- assert_eq!("fake_bucket", builder.bucket_name.unwrap());
- }
- }
}