This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 108b7a8d0 Consistently use GCP XML API (#4207)
108b7a8d0 is described below
commit 108b7a8d0002f9ffc8f3e626f488ec497991b503
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon May 15 18:56:21 2023 +0100
Consistently use GCP XML API (#4207)
* Consistently use GCP XML API
* Use updated fake-gcs-server
* Review feedback
---
.github/workflows/object_store.yml | 3 +-
object_store/CONTRIBUTING.md | 2 +-
object_store/Cargo.toml | 2 +-
object_store/src/aws/client.rs | 69 +---------------
object_store/src/aws/mod.rs | 57 ++-----------
object_store/src/azure/mod.rs | 60 ++------------
object_store/src/client/header.rs | 83 +++++++++++++++++++
object_store/src/client/list.rs | 85 ++++++++++++++++++++
object_store/src/client/mod.rs | 6 ++
object_store/src/gcp/mod.rs | 161 ++++++++++++-------------------------
object_store/src/prefix.rs | 6 +-
11 files changed, 247 insertions(+), 287 deletions(-)
diff --git a/.github/workflows/object_store.yml
b/.github/workflows/object_store.yml
index 65c78df18..df43ae3bf 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -95,8 +95,9 @@ jobs:
- uses: actions/checkout@v3
- name: Configure Fake GCS Server (GCP emulation)
+ # Custom image - see fsouza/fake-gcs-server#1164
run: |
- docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
+ docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http
-backend memory -public-host localhost:4443
# Give the container a moment to start up prior to configuring it
sleep 1
curl -v -X POST --data-binary '{"name":"test-bucket"}' -H
"Content-Type: application/json" "http://localhost:4443/storage/v1/b"
diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
index 550640d93..47c294022 100644
--- a/object_store/CONTRIBUTING.md
+++ b/object_store/CONTRIBUTING.md
@@ -103,7 +103,7 @@ To test the GCS integration, we use [Fake GCS
Server](https://github.com/fsouza/
Startup the fake server:
```shell
-docker run -p 4443:4443 fsouza/fake-gcs-server -scheme http
+docker run -p 4443:4443 tustvold/fake-gcs-server -scheme http
```
Configure the account:
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index e25801b6c..c6b89fa23 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -68,7 +68,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros",
"rt", "time", "io-ut
nix = "0.26.1"
[features]
-cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest",
"reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
+cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest",
"reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
azure = ["cloud"]
gcp = ["cloud", "rustls-pemfile"]
aws = ["cloud"]
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index b2d01abfb..1cdf785e5 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -18,6 +18,7 @@
use crate::aws::checksum::Checksum;
use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
+use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::GetOptionsExt;
@@ -25,13 +26,12 @@ use crate::multipart::UploadPart;
use crate::path::DELIMITER;
use crate::util::format_prefix;
use crate::{
- BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta,
Path,
- Result, RetryConfig, StreamExt,
+ BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path,
Result,
+ RetryConfig, StreamExt,
};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
-use chrono::{DateTime, Utc};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
use serde::{Deserialize, Serialize};
@@ -109,69 +109,6 @@ impl From<Error> for crate::Error {
}
}
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListResponse {
- #[serde(default)]
- pub contents: Vec<ListContents>,
- #[serde(default)]
- pub common_prefixes: Vec<ListPrefix>,
- #[serde(default)]
- pub next_continuation_token: Option<String>,
-}
-
-impl TryFrom<ListResponse> for ListResult {
- type Error = crate::Error;
-
- fn try_from(value: ListResponse) -> Result<Self> {
- let common_prefixes = value
- .common_prefixes
- .into_iter()
- .map(|x| Ok(Path::parse(x.prefix)?))
- .collect::<Result<_>>()?;
-
- let objects = value
- .contents
- .into_iter()
- .map(TryFrom::try_from)
- .collect::<Result<_>>()?;
-
- Ok(Self {
- common_prefixes,
- objects,
- })
- }
-}
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListPrefix {
- pub prefix: String,
-}
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListContents {
- pub key: String,
- pub size: usize,
- pub last_modified: DateTime<Utc>,
- #[serde(rename = "ETag")]
- pub e_tag: Option<String>,
-}
-
-impl TryFrom<ListContents> for ObjectMeta {
- type Error = crate::Error;
-
- fn try_from(value: ListContents) -> Result<Self> {
- Ok(Self {
- location: Path::parse(value.key)?,
- last_modified: value.last_modified,
- size: value.size,
- e_tag: value.e_tag,
- })
- }
-}
-
#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct InitiateMultipart {
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 3f9b4803f..2c38a9b71 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -33,7 +33,6 @@
use async_trait::async_trait;
use bytes::Bytes;
-use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
@@ -52,6 +51,7 @@ use crate::aws::credential::{
AwsCredential, CredentialProvider, InstanceCredentialProvider,
StaticCredentialProvider, WebIdentityProvider,
};
+use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl,
UploadPart};
@@ -87,24 +87,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
- #[snafu(display("Last-Modified Header missing from response"))]
- MissingLastModified,
-
- #[snafu(display("Content-Length Header missing from response"))]
- MissingContentLength,
-
- #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
- InvalidLastModified {
- last_modified: String,
- source: chrono::ParseError,
- },
-
- #[snafu(display("Invalid content length '{}': {}", content_length,
source))]
- InvalidContentLength {
- content_length: String,
- source: std::num::ParseIntError,
- },
-
#[snafu(display("Missing region"))]
MissingRegion,
@@ -155,6 +137,11 @@ enum Error {
#[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
RegionParse { bucket: String },
+
+ #[snafu(display("Failed to parse headers: {}", source))]
+ Header {
+ source: crate::client::header::Error,
+ },
}
impl From<Error> for super::Error {
@@ -261,41 +248,11 @@ impl ObjectStore for AmazonS3 {
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
-
let options = GetOptions::default();
// Extract meta from headers
//
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
let response = self.client.get_request(location, options, true).await?;
- let headers = response.headers();
-
- let last_modified = headers
- .get(LAST_MODIFIED)
- .context(MissingLastModifiedSnafu)?;
-
- let content_length = headers
- .get(CONTENT_LENGTH)
- .context(MissingContentLengthSnafu)?;
-
- let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
- let last_modified = DateTime::parse_from_rfc2822(last_modified)
- .context(InvalidLastModifiedSnafu { last_modified })?
- .with_timezone(&Utc);
-
- let content_length = content_length.to_str().context(BadHeaderSnafu)?;
- let content_length = content_length
- .parse()
- .context(InvalidContentLengthSnafu { content_length })?;
-
- let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
- let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
-
- Ok(ObjectMeta {
- location: location.clone(),
- last_modified,
- size: content_length,
- e_tag: Some(e_tag.to_string()),
- })
+ Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}
async fn delete(&self, location: &Path) -> Result<()> {
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 6726241aa..0f8dae00c 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -38,7 +38,6 @@ use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
-use chrono::{TimeZone, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use percent_encoding::percent_decode_str;
use serde::{Deserialize, Serialize};
@@ -50,9 +49,9 @@ use std::{collections::BTreeSet, str::FromStr};
use tokio::io::AsyncWrite;
use url::Url;
+use crate::client::header::header_meta;
use crate::client::ClientConfigKey;
use crate::config::ConfigValue;
-use crate::util::RFC1123_FMT;
pub use credential::authority_hosts;
mod client;
@@ -75,24 +74,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
- #[snafu(display("Last-Modified Header missing from response"))]
- MissingLastModified,
-
- #[snafu(display("Content-Length Header missing from response"))]
- MissingContentLength,
-
- #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
- InvalidLastModified {
- last_modified: String,
- source: chrono::ParseError,
- },
-
- #[snafu(display("Invalid content length '{}': {}", content_length,
source))]
- InvalidContentLength {
- content_length: String,
- source: std::num::ParseIntError,
- },
-
#[snafu(display("Received header containing non-ASCII data"))]
BadHeader { source: reqwest::header::ToStrError },
@@ -146,6 +127,11 @@ enum Error {
#[snafu(display("ETag Header missing from response"))]
MissingEtag,
+
+ #[snafu(display("Failed to parse headers: {}", source))]
+ Header {
+ source: crate::client::header::Error,
+ },
}
impl From<Error> for super::Error {
@@ -223,44 +209,12 @@ impl ObjectStore for MicrosoftAzure {
}
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
- use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
let options = GetOptions::default();
// Extract meta from headers
//
https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
let response = self.client.get_request(location, options, true).await?;
- let headers = response.headers();
-
- let last_modified = headers
- .get(LAST_MODIFIED)
- .ok_or(Error::MissingLastModified)?
- .to_str()
- .context(BadHeaderSnafu)?;
- let last_modified = Utc
- .datetime_from_str(last_modified, RFC1123_FMT)
- .context(InvalidLastModifiedSnafu { last_modified })?;
-
- let content_length = headers
- .get(CONTENT_LENGTH)
- .ok_or(Error::MissingContentLength)?
- .to_str()
- .context(BadHeaderSnafu)?;
- let content_length = content_length
- .parse()
- .context(InvalidContentLengthSnafu { content_length })?;
-
- let e_tag = headers
- .get(ETAG)
- .ok_or(Error::MissingEtag)?
- .to_str()
- .context(BadHeaderSnafu)?;
-
- Ok(ObjectMeta {
- location: location.clone(),
- last_modified,
- size: content_length,
- e_tag: Some(e_tag.to_string()),
- })
+ Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}
async fn delete(&self, location: &Path) -> Result<()> {
diff --git a/object_store/src/client/header.rs
b/object_store/src/client/header.rs
new file mode 100644
index 000000000..cc4f16eaa
--- /dev/null
+++ b/object_store/src/client/header.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure
+
+use crate::path::Path;
+use crate::ObjectMeta;
+use chrono::{DateTime, Utc};
+use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
+use hyper::HeaderMap;
+use snafu::{OptionExt, ResultExt, Snafu};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("ETag Header missing from response"))]
+ MissingEtag,
+
+ #[snafu(display("Received header containing non-ASCII data"))]
+ BadHeader { source: reqwest::header::ToStrError },
+
+ #[snafu(display("Last-Modified Header missing from response"))]
+ MissingLastModified,
+
+ #[snafu(display("Content-Length Header missing from response"))]
+ MissingContentLength,
+
+ #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
+ InvalidLastModified {
+ last_modified: String,
+ source: chrono::ParseError,
+ },
+
+ #[snafu(display("Invalid content length '{}': {}", content_length,
source))]
+ InvalidContentLength {
+ content_length: String,
+ source: std::num::ParseIntError,
+ },
+}
+
+/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
+pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta,
Error> {
+ let last_modified = headers
+ .get(LAST_MODIFIED)
+ .context(MissingLastModifiedSnafu)?;
+
+ let content_length = headers
+ .get(CONTENT_LENGTH)
+ .context(MissingContentLengthSnafu)?;
+
+ let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
+ let last_modified = DateTime::parse_from_rfc2822(last_modified)
+ .context(InvalidLastModifiedSnafu { last_modified })?
+ .with_timezone(&Utc);
+
+ let content_length = content_length.to_str().context(BadHeaderSnafu)?;
+ let content_length = content_length
+ .parse()
+ .context(InvalidContentLengthSnafu { content_length })?;
+
+ let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
+ let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
+
+ Ok(ObjectMeta {
+ location: location.clone(),
+ last_modified,
+ size: content_length,
+ e_tag: Some(e_tag.to_string()),
+ })
+}
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
new file mode 100644
index 000000000..6a3889e3b
--- /dev/null
+++ b/object_store/src/client/list.rs
@@ -0,0 +1,85 @@
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The list response format used by GCP and AWS
+
+use crate::path::Path;
+use crate::{ListResult, ObjectMeta, Result};
+use chrono::{DateTime, Utc};
+use serde::Deserialize;
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListResponse {
+ #[serde(default)]
+ pub contents: Vec<ListContents>,
+ #[serde(default)]
+ pub common_prefixes: Vec<ListPrefix>,
+ #[serde(default)]
+ pub next_continuation_token: Option<String>,
+}
+
+impl TryFrom<ListResponse> for ListResult {
+ type Error = crate::Error;
+
+ fn try_from(value: ListResponse) -> Result<Self> {
+ let common_prefixes = value
+ .common_prefixes
+ .into_iter()
+ .map(|x| Ok(Path::parse(x.prefix)?))
+ .collect::<Result<_>>()?;
+
+ let objects = value
+ .contents
+ .into_iter()
+ .map(TryFrom::try_from)
+ .collect::<Result<_>>()?;
+
+ Ok(Self {
+ common_prefixes,
+ objects,
+ })
+ }
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListPrefix {
+ pub prefix: String,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListContents {
+ pub key: String,
+ pub size: usize,
+ pub last_modified: DateTime<Utc>,
+ #[serde(rename = "ETag")]
+ pub e_tag: Option<String>,
+}
+
+impl TryFrom<ListContents> for ObjectMeta {
+ type Error = crate::Error;
+
+ fn try_from(value: ListContents) -> Result<Self> {
+ Ok(Self {
+ location: Path::parse(value.key)?,
+ last_modified: value.last_modified,
+ size: value.size,
+ e_tag: value.e_tag,
+ })
+ }
+}
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index be44a9f99..c6a73fe7a 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -26,6 +26,12 @@ pub mod retry;
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod token;
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod header;
+
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub mod list;
+
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 41a91fef8..32f4055f1 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -36,15 +36,16 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
-use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
-use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
+use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::{header, Client, Method, Response, StatusCode};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
+use crate::client::header::header_meta;
+use crate::client::list::ListResponse;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::client::{ClientConfigKey, GetOptionsExt};
@@ -82,6 +83,9 @@ enum Error {
#[snafu(display("Error getting list response body: {}", source))]
ListResponseBody { source: reqwest::Error },
+ #[snafu(display("Got invalid list response: {}", source))]
+ InvalidListResponse { source: quick_xml::de::DeError },
+
#[snafu(display("Error performing get request {}: {}", path, source))]
GetRequest {
source: crate::client::retry::Error,
@@ -143,6 +147,11 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
+
+ #[snafu(display("Failed to parse headers: {}", source))]
+ Header {
+ source: crate::client::header::Error,
+ },
}
impl From<Error> for super::Error {
@@ -162,25 +171,6 @@ impl From<Error> for super::Error {
}
}
-#[derive(serde::Deserialize, Debug)]
-#[serde(rename_all = "camelCase")]
-struct ListResponse {
- next_page_token: Option<String>,
- #[serde(default)]
- prefixes: Vec<String>,
- #[serde(default)]
- items: Vec<Object>,
-}
-
-#[derive(serde::Deserialize, Debug)]
-struct Object {
- name: String,
- size: String,
- updated: DateTime<Utc>,
- #[serde(rename = "etag")]
- e_tag: Option<String>,
-}
-
#[derive(serde::Deserialize, Debug)]
#[serde(rename_all = "PascalCase")]
struct InitiateMultipartUploadResult {
@@ -248,15 +238,11 @@ impl GoogleCloudStorageClient {
}
fn object_url(&self, path: &Path) -> String {
- let encoded =
- percent_encoding::utf8_percent_encode(path.as_ref(),
NON_ALPHANUMERIC);
- format!(
- "{}/storage/v1/b/{}/o/{}",
- self.base_url, self.bucket_name_encoded, encoded
- )
+ let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
+ format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded)
}
- /// Perform a get request
<https://cloud.google.com/storage/docs/json_api/v1/objects/get>
+ /// Perform a get request
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
async fn get_request(
&self,
path: &Path,
@@ -266,16 +252,15 @@ impl GoogleCloudStorageClient {
let token = self.get_token().await?;
let url = self.object_url(path);
- let alt = match head {
- true => "json",
- false => "media",
+ let method = match head {
+ true => Method::HEAD,
+ false => Method::GET,
};
- let builder = self.client.request(Method::GET, url);
-
- let response = builder
+ let response = self
+ .client
+ .request(method, url)
.bearer_auth(token)
- .query(&[("alt", alt)])
.with_get_options(options)
.send_retry(&self.retry_config)
.await
@@ -286,13 +271,10 @@ impl GoogleCloudStorageClient {
Ok(response)
}
- /// Perform a put request
<https://cloud.google.com/storage/docs/json_api/v1/objects/insert>
+ /// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
let token = self.get_token().await?;
- let url = format!(
- "{}/upload/storage/v1/b/{}/o",
- self.base_url, self.bucket_name_encoded
- );
+ let url = self.object_url(path);
let content_type = self
.client_options
@@ -300,11 +282,10 @@ impl GoogleCloudStorageClient {
.unwrap_or("application/octet-stream");
self.client
- .request(Method::POST, url)
+ .request(Method::PUT, url)
.bearer_auth(token)
.header(header::CONTENT_TYPE, content_type)
.header(header::CONTENT_LENGTH, payload.len())
- .query(&[("uploadType", "media"), ("name", path.as_ref())])
.body(payload)
.send_retry(&self.retry_config)
.await
@@ -373,7 +354,7 @@ impl GoogleCloudStorageClient {
Ok(())
}
- /// Perform a delete request
<https://cloud.google.com/storage/docs/json_api/v1/objects/delete>
+ /// Perform a delete request
<https://cloud.google.com/storage/docs/xml-api/delete-object>
async fn delete_request(&self, path: &Path) -> Result<()> {
let token = self.get_token().await?;
let url = self.object_url(path);
@@ -390,7 +371,7 @@ impl GoogleCloudStorageClient {
Ok(())
}
- /// Perform a copy request
<https://cloud.google.com/storage/docs/json_api/v1/objects/copy>
+ /// Perform a copy request
<https://cloud.google.com/storage/docs/xml-api/put-object-copy>
async fn copy_request(
&self,
from: &Path,
@@ -398,24 +379,18 @@ impl GoogleCloudStorageClient {
if_not_exists: bool,
) -> Result<()> {
let token = self.get_token().await?;
+ let url = self.object_url(to);
- let source =
- percent_encoding::utf8_percent_encode(from.as_ref(),
NON_ALPHANUMERIC);
- let destination =
- percent_encoding::utf8_percent_encode(to.as_ref(),
NON_ALPHANUMERIC);
- let url = format!(
- "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
- self.base_url,
- self.bucket_name_encoded,
- source,
- self.bucket_name_encoded,
- destination
- );
+ let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
+ let source = format!("{}/{}", self.bucket_name_encoded, from);
- let mut builder = self.client.request(Method::POST, url);
+ let mut builder = self
+ .client
+ .request(Method::PUT, url)
+ .header("x-goog-copy-source", source);
if if_not_exists {
- builder = builder.query(&[("ifGenerationMatch", "0")]);
+ builder = builder.header("x-goog-if-generation-match", 0);
}
builder
@@ -436,7 +411,7 @@ impl GoogleCloudStorageClient {
Ok(())
}
- /// Perform a list request
<https://cloud.google.com/storage/docs/json_api/v1/objects/list>
+ /// Perform a list request
<https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
async fn list_request(
&self,
prefix: Option<&str>,
@@ -444,13 +419,10 @@ impl GoogleCloudStorageClient {
page_token: Option<&str>,
) -> Result<ListResponse> {
let token = self.get_token().await?;
+ let url = format!("{}/{}", self.base_url, self.bucket_name_encoded);
- let url = format!(
- "{}/storage/v1/b/{}/o",
- self.base_url, self.bucket_name_encoded
- );
-
- let mut query = Vec::with_capacity(4);
+ let mut query = Vec::with_capacity(5);
+ query.push(("list-type", "2"));
if delimiter {
query.push(("delimiter", DELIMITER))
}
@@ -460,14 +432,14 @@ impl GoogleCloudStorageClient {
}
if let Some(page_token) = page_token {
- query.push(("pageToken", page_token))
+ query.push(("continuation-token", page_token))
}
if let Some(max_results) = &self.max_list_results {
- query.push(("maxResults", max_results))
+ query.push(("max-keys", max_results))
}
- let response: ListResponse = self
+ let response = self
.client
.request(Method::GET, url)
.query(&query)
@@ -475,10 +447,13 @@ impl GoogleCloudStorageClient {
.send_retry(&self.retry_config)
.await
.context(ListRequestSnafu)?
- .json()
+ .bytes()
.await
.context(ListResponseBodySnafu)?;
+ let response: ListResponse =
quick_xml::de::from_reader(response.reader())
+ .context(InvalidListResponseSnafu)?;
+
Ok(response)
}
@@ -487,14 +462,14 @@ impl GoogleCloudStorageClient {
&self,
prefix: Option<&Path>,
delimiter: bool,
- ) -> BoxStream<'_, Result<ListResponse>> {
+ ) -> BoxStream<'_, Result<ListResult>> {
let prefix = format_prefix(prefix);
stream_paginated(prefix, move |prefix, token| async move {
let mut r = self
.list_request(prefix.as_deref(), delimiter, token.as_deref())
.await?;
- let next_token = r.next_page_token.take();
- Ok((r, prefix, next_token))
+ let next_token = r.next_continuation_token.take();
+ Ok((r.try_into()?, prefix, next_token))
})
.boxed()
}
@@ -639,12 +614,6 @@ impl ObjectStore for GoogleCloudStorage {
}
async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
- if options.if_modified_since.is_some() ||
options.if_unmodified_since.is_some() {
- return Err(super::Error::NotSupported {
- source: "ModifiedSince Preconditions not supported by
GoogleCloudStorage JSON API".to_string().into(),
- });
- }
-
let response = self.client.get_request(location, options,
false).await?;
let stream = response
.bytes_stream()
@@ -660,10 +629,7 @@ impl ObjectStore for GoogleCloudStorage {
async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let options = GetOptions::default();
let response = self.client.get_request(location, options, true).await?;
- let object = response.json().await.context(GetResponseBodySnafu {
- path: location.as_ref(),
- })?;
- convert_object_meta(&object)
+ Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
}
async fn delete(&self, location: &Path) -> Result<()> {
@@ -677,11 +643,7 @@ impl ObjectStore for GoogleCloudStorage {
let stream = self
.client
.list_paginated(prefix, false)
- .map_ok(|r| {
- futures::stream::iter(
- r.items.into_iter().map(|x| convert_object_meta(&x)),
- )
- })
+ .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed();
@@ -696,15 +658,8 @@ impl ObjectStore for GoogleCloudStorage {
while let Some(result) = stream.next().await {
let response = result?;
-
- for p in response.prefixes {
- common_prefixes.insert(Path::parse(p)?);
- }
-
- objects.reserve(response.items.len());
- for object in &response.items {
- objects.push(convert_object_meta(object)?);
- }
+ common_prefixes.extend(response.common_prefixes.into_iter());
+ objects.extend(response.objects.into_iter());
}
Ok(ListResult {
@@ -1170,20 +1125,6 @@ impl GoogleCloudStorageBuilder {
}
}
-fn convert_object_meta(object: &Object) -> Result<ObjectMeta> {
- let location = Path::parse(&object.name)?;
- let last_modified = object.updated;
- let size = object.size.parse().context(InvalidSizeSnafu)?;
- let e_tag = object.e_tag.clone();
-
- Ok(ObjectMeta {
- location,
- last_modified,
- size,
- e_tag,
- })
-}
-
#[cfg(test)]
mod test {
use bytes::Bytes;
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index ffe509411..39585f73b 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -119,11 +119,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
self.inner.get_range(&full_path, range).await
}
- async fn get_opts(
- &self,
- location: &Path,
- options: GetOptions,
- ) -> Result<GetResult> {
+ async fn get_opts(&self, location: &Path, options: GetOptions) ->
Result<GetResult> {
let full_path = self.full_path(location);
self.inner.get_opts(&full_path, options).await
}