This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 108b7a8d0 Consistently use GCP XML API (#4207)
108b7a8d0 is described below

commit 108b7a8d0002f9ffc8f3e626f488ec497991b503
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Mon May 15 18:56:21 2023 +0100

    Consistently use GCP XML API (#4207)
    
    * Consistently use GCP XML API
    
    * Use updated fake-gcs-server
    
    * Review feedback
---
 .github/workflows/object_store.yml |   3 +-
 object_store/CONTRIBUTING.md       |   2 +-
 object_store/Cargo.toml            |   2 +-
 object_store/src/aws/client.rs     |  69 +---------------
 object_store/src/aws/mod.rs        |  57 ++-----------
 object_store/src/azure/mod.rs      |  60 ++------------
 object_store/src/client/header.rs  |  83 +++++++++++++++++++
 object_store/src/client/list.rs    |  85 ++++++++++++++++++++
 object_store/src/client/mod.rs     |   6 ++
 object_store/src/gcp/mod.rs        | 161 ++++++++++++-------------------------
 object_store/src/prefix.rs         |   6 +-
 11 files changed, 247 insertions(+), 287 deletions(-)

diff --git a/.github/workflows/object_store.yml 
b/.github/workflows/object_store.yml
index 65c78df18..df43ae3bf 100644
--- a/.github/workflows/object_store.yml
+++ b/.github/workflows/object_store.yml
@@ -95,8 +95,9 @@ jobs:
       - uses: actions/checkout@v3
 
       - name: Configure Fake GCS Server (GCP emulation)
+        # Custom image - see fsouza/fake-gcs-server#1164
         run: |
-          docker run -d -p 4443:4443 fsouza/fake-gcs-server -scheme http
+          docker run -d -p 4443:4443 tustvold/fake-gcs-server -scheme http 
-backend memory -public-host localhost:4443
           # Give the container a moment to start up prior to configuring it
           sleep 1
           curl -v -X POST --data-binary '{"name":"test-bucket"}' -H 
"Content-Type: application/json" "http://localhost:4443/storage/v1/b";
diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md
index 550640d93..47c294022 100644
--- a/object_store/CONTRIBUTING.md
+++ b/object_store/CONTRIBUTING.md
@@ -103,7 +103,7 @@ To test the GCS integration, we use [Fake GCS 
Server](https://github.com/fsouza/
 Startup the fake server:
 
 ```shell
-docker run -p 4443:4443 fsouza/fake-gcs-server -scheme http
+docker run -p 4443:4443 tustvold/fake-gcs-server -scheme http
 ```
 
 Configure the account:
diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml
index e25801b6c..c6b89fa23 100644
--- a/object_store/Cargo.toml
+++ b/object_store/Cargo.toml
@@ -68,7 +68,7 @@ tokio = { version = "1.25.0", features = ["sync", "macros", 
"rt", "time", "io-ut
 nix = "0.26.1"
 
 [features]
-cloud = ["serde", "serde_json", "quick-xml",  "hyper", "reqwest", 
"reqwest/json","reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
+cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", 
"reqwest/json", "reqwest/stream", "chrono/serde", "base64", "rand", "ring"]
 azure = ["cloud"]
 gcp = ["cloud", "rustls-pemfile"]
 aws = ["cloud"]
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index b2d01abfb..1cdf785e5 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -18,6 +18,7 @@
 use crate::aws::checksum::Checksum;
 use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
 use crate::aws::{STORE, STRICT_PATH_ENCODE_SET};
+use crate::client::list::ListResponse;
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
 use crate::client::GetOptionsExt;
@@ -25,13 +26,12 @@ use crate::multipart::UploadPart;
 use crate::path::DELIMITER;
 use crate::util::format_prefix;
 use crate::{
-    BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, ObjectMeta, 
Path,
-    Result, RetryConfig, StreamExt,
+    BoxStream, ClientOptions, GetOptions, ListResult, MultipartId, Path, 
Result,
+    RetryConfig, StreamExt,
 };
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
-use chrono::{DateTime, Utc};
 use percent_encoding::{utf8_percent_encode, PercentEncode};
 use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method, Response};
 use serde::{Deserialize, Serialize};
@@ -109,69 +109,6 @@ impl From<Error> for crate::Error {
     }
 }
 
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListResponse {
-    #[serde(default)]
-    pub contents: Vec<ListContents>,
-    #[serde(default)]
-    pub common_prefixes: Vec<ListPrefix>,
-    #[serde(default)]
-    pub next_continuation_token: Option<String>,
-}
-
-impl TryFrom<ListResponse> for ListResult {
-    type Error = crate::Error;
-
-    fn try_from(value: ListResponse) -> Result<Self> {
-        let common_prefixes = value
-            .common_prefixes
-            .into_iter()
-            .map(|x| Ok(Path::parse(x.prefix)?))
-            .collect::<Result<_>>()?;
-
-        let objects = value
-            .contents
-            .into_iter()
-            .map(TryFrom::try_from)
-            .collect::<Result<_>>()?;
-
-        Ok(Self {
-            common_prefixes,
-            objects,
-        })
-    }
-}
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListPrefix {
-    pub prefix: String,
-}
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-pub struct ListContents {
-    pub key: String,
-    pub size: usize,
-    pub last_modified: DateTime<Utc>,
-    #[serde(rename = "ETag")]
-    pub e_tag: Option<String>,
-}
-
-impl TryFrom<ListContents> for ObjectMeta {
-    type Error = crate::Error;
-
-    fn try_from(value: ListContents) -> Result<Self> {
-        Ok(Self {
-            location: Path::parse(value.key)?,
-            last_modified: value.last_modified,
-            size: value.size,
-            e_tag: value.e_tag,
-        })
-    }
-}
-
 #[derive(Debug, Deserialize)]
 #[serde(rename_all = "PascalCase")]
 struct InitiateMultipart {
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 3f9b4803f..2c38a9b71 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -33,7 +33,6 @@
 
 use async_trait::async_trait;
 use bytes::Bytes;
-use chrono::{DateTime, Utc};
 use futures::stream::BoxStream;
 use futures::TryStreamExt;
 use itertools::Itertools;
@@ -52,6 +51,7 @@ use crate::aws::credential::{
     AwsCredential, CredentialProvider, InstanceCredentialProvider,
     StaticCredentialProvider, WebIdentityProvider,
 };
+use crate::client::header::header_meta;
 use crate::client::ClientConfigKey;
 use crate::config::ConfigValue;
 use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, 
UploadPart};
@@ -87,24 +87,6 @@ static METADATA_ENDPOINT: &str = "http://169.254.169.254";;
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
 enum Error {
-    #[snafu(display("Last-Modified Header missing from response"))]
-    MissingLastModified,
-
-    #[snafu(display("Content-Length Header missing from response"))]
-    MissingContentLength,
-
-    #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
-    InvalidLastModified {
-        last_modified: String,
-        source: chrono::ParseError,
-    },
-
-    #[snafu(display("Invalid content length '{}': {}", content_length, 
source))]
-    InvalidContentLength {
-        content_length: String,
-        source: std::num::ParseIntError,
-    },
-
     #[snafu(display("Missing region"))]
     MissingRegion,
 
@@ -155,6 +137,11 @@ enum Error {
 
     #[snafu(display("Failed to parse the region for bucket '{}'", bucket))]
     RegionParse { bucket: String },
+
+    #[snafu(display("Failed to parse headers: {}", source))]
+    Header {
+        source: crate::client::header::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -261,41 +248,11 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
-
         let options = GetOptions::default();
         // Extract meta from headers
         // 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
         let response = self.client.get_request(location, options, true).await?;
-        let headers = response.headers();
-
-        let last_modified = headers
-            .get(LAST_MODIFIED)
-            .context(MissingLastModifiedSnafu)?;
-
-        let content_length = headers
-            .get(CONTENT_LENGTH)
-            .context(MissingContentLengthSnafu)?;
-
-        let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
-        let last_modified = DateTime::parse_from_rfc2822(last_modified)
-            .context(InvalidLastModifiedSnafu { last_modified })?
-            .with_timezone(&Utc);
-
-        let content_length = content_length.to_str().context(BadHeaderSnafu)?;
-        let content_length = content_length
-            .parse()
-            .context(InvalidContentLengthSnafu { content_length })?;
-
-        let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
-        let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
-
-        Ok(ObjectMeta {
-            location: location.clone(),
-            last_modified,
-            size: content_length,
-            e_tag: Some(e_tag.to_string()),
-        })
+        Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 6726241aa..0f8dae00c 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -38,7 +38,6 @@ use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::Bytes;
-use chrono::{TimeZone, Utc};
 use futures::{stream::BoxStream, StreamExt, TryStreamExt};
 use percent_encoding::percent_decode_str;
 use serde::{Deserialize, Serialize};
@@ -50,9 +49,9 @@ use std::{collections::BTreeSet, str::FromStr};
 use tokio::io::AsyncWrite;
 use url::Url;
 
+use crate::client::header::header_meta;
 use crate::client::ClientConfigKey;
 use crate::config::ConfigValue;
-use crate::util::RFC1123_FMT;
 pub use credential::authority_hosts;
 
 mod client;
@@ -75,24 +74,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
 enum Error {
-    #[snafu(display("Last-Modified Header missing from response"))]
-    MissingLastModified,
-
-    #[snafu(display("Content-Length Header missing from response"))]
-    MissingContentLength,
-
-    #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
-    InvalidLastModified {
-        last_modified: String,
-        source: chrono::ParseError,
-    },
-
-    #[snafu(display("Invalid content length '{}': {}", content_length, 
source))]
-    InvalidContentLength {
-        content_length: String,
-        source: std::num::ParseIntError,
-    },
-
     #[snafu(display("Received header containing non-ASCII data"))]
     BadHeader { source: reqwest::header::ToStrError },
 
@@ -146,6 +127,11 @@ enum Error {
 
     #[snafu(display("ETag Header missing from response"))]
     MissingEtag,
+
+    #[snafu(display("Failed to parse headers: {}", source))]
+    Header {
+        source: crate::client::header::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -223,44 +209,12 @@ impl ObjectStore for MicrosoftAzure {
     }
 
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        use reqwest::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
         let options = GetOptions::default();
 
         // Extract meta from headers
         // 
https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
         let response = self.client.get_request(location, options, true).await?;
-        let headers = response.headers();
-
-        let last_modified = headers
-            .get(LAST_MODIFIED)
-            .ok_or(Error::MissingLastModified)?
-            .to_str()
-            .context(BadHeaderSnafu)?;
-        let last_modified = Utc
-            .datetime_from_str(last_modified, RFC1123_FMT)
-            .context(InvalidLastModifiedSnafu { last_modified })?;
-
-        let content_length = headers
-            .get(CONTENT_LENGTH)
-            .ok_or(Error::MissingContentLength)?
-            .to_str()
-            .context(BadHeaderSnafu)?;
-        let content_length = content_length
-            .parse()
-            .context(InvalidContentLengthSnafu { content_length })?;
-
-        let e_tag = headers
-            .get(ETAG)
-            .ok_or(Error::MissingEtag)?
-            .to_str()
-            .context(BadHeaderSnafu)?;
-
-        Ok(ObjectMeta {
-            location: location.clone(),
-            last_modified,
-            size: content_length,
-            e_tag: Some(e_tag.to_string()),
-        })
+        Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
diff --git a/object_store/src/client/header.rs 
b/object_store/src/client/header.rs
new file mode 100644
index 000000000..cc4f16eaa
--- /dev/null
+++ b/object_store/src/client/header.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Logic for extracting ObjectMeta from headers used by AWS, GCP and Azure
+
+use crate::path::Path;
+use crate::ObjectMeta;
+use chrono::{DateTime, Utc};
+use hyper::header::{CONTENT_LENGTH, ETAG, LAST_MODIFIED};
+use hyper::HeaderMap;
+use snafu::{OptionExt, ResultExt, Snafu};
+
+#[derive(Debug, Snafu)]
+pub enum Error {
+    #[snafu(display("ETag Header missing from response"))]
+    MissingEtag,
+
+    #[snafu(display("Received header containing non-ASCII data"))]
+    BadHeader { source: reqwest::header::ToStrError },
+
+    #[snafu(display("Last-Modified Header missing from response"))]
+    MissingLastModified,
+
+    #[snafu(display("Content-Length Header missing from response"))]
+    MissingContentLength,
+
+    #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
+    InvalidLastModified {
+        last_modified: String,
+        source: chrono::ParseError,
+    },
+
+    #[snafu(display("Invalid content length '{}': {}", content_length, 
source))]
+    InvalidContentLength {
+        content_length: String,
+        source: std::num::ParseIntError,
+    },
+}
+
+/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
+pub fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta, 
Error> {
+    let last_modified = headers
+        .get(LAST_MODIFIED)
+        .context(MissingLastModifiedSnafu)?;
+
+    let content_length = headers
+        .get(CONTENT_LENGTH)
+        .context(MissingContentLengthSnafu)?;
+
+    let last_modified = last_modified.to_str().context(BadHeaderSnafu)?;
+    let last_modified = DateTime::parse_from_rfc2822(last_modified)
+        .context(InvalidLastModifiedSnafu { last_modified })?
+        .with_timezone(&Utc);
+
+    let content_length = content_length.to_str().context(BadHeaderSnafu)?;
+    let content_length = content_length
+        .parse()
+        .context(InvalidContentLengthSnafu { content_length })?;
+
+    let e_tag = headers.get(ETAG).context(MissingEtagSnafu)?;
+    let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
+
+    Ok(ObjectMeta {
+        location: location.clone(),
+        last_modified,
+        size: content_length,
+        e_tag: Some(e_tag.to_string()),
+    })
+}
diff --git a/object_store/src/client/list.rs b/object_store/src/client/list.rs
new file mode 100644
index 000000000..6a3889e3b
--- /dev/null
+++ b/object_store/src/client/list.rs
@@ -0,0 +1,85 @@
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The list response format used by GCP and AWS
+
+use crate::path::Path;
+use crate::{ListResult, ObjectMeta, Result};
+use chrono::{DateTime, Utc};
+use serde::Deserialize;
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListResponse {
+    #[serde(default)]
+    pub contents: Vec<ListContents>,
+    #[serde(default)]
+    pub common_prefixes: Vec<ListPrefix>,
+    #[serde(default)]
+    pub next_continuation_token: Option<String>,
+}
+
+impl TryFrom<ListResponse> for ListResult {
+    type Error = crate::Error;
+
+    fn try_from(value: ListResponse) -> Result<Self> {
+        let common_prefixes = value
+            .common_prefixes
+            .into_iter()
+            .map(|x| Ok(Path::parse(x.prefix)?))
+            .collect::<Result<_>>()?;
+
+        let objects = value
+            .contents
+            .into_iter()
+            .map(TryFrom::try_from)
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            common_prefixes,
+            objects,
+        })
+    }
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListPrefix {
+    pub prefix: String,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListContents {
+    pub key: String,
+    pub size: usize,
+    pub last_modified: DateTime<Utc>,
+    #[serde(rename = "ETag")]
+    pub e_tag: Option<String>,
+}
+
+impl TryFrom<ListContents> for ObjectMeta {
+    type Error = crate::Error;
+
+    fn try_from(value: ListContents) -> Result<Self> {
+        Ok(Self {
+            location: Path::parse(value.key)?,
+            last_modified: value.last_modified,
+            size: value.size,
+            e_tag: value.e_tag,
+        })
+    }
+}
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index be44a9f99..c6a73fe7a 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -26,6 +26,12 @@ pub mod retry;
 #[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
 pub mod token;
 
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub mod header;
+
+#[cfg(any(feature = "aws", feature = "gcp"))]
+pub mod list;
+
 use std::collections::HashMap;
 use std::str::FromStr;
 use std::time::Duration;
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 41a91fef8..32f4055f1 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -36,15 +36,16 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
-use chrono::{DateTime, Utc};
 use futures::{stream::BoxStream, StreamExt, TryStreamExt};
-use percent_encoding::{percent_encode, NON_ALPHANUMERIC};
+use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
 use reqwest::{header, Client, Method, Response, StatusCode};
 use serde::{Deserialize, Serialize};
 use snafu::{OptionExt, ResultExt, Snafu};
 use tokio::io::AsyncWrite;
 use url::Url;
 
+use crate::client::header::header_meta;
+use crate::client::list::ListResponse;
 use crate::client::pagination::stream_paginated;
 use crate::client::retry::RetryExt;
 use crate::client::{ClientConfigKey, GetOptionsExt};
@@ -82,6 +83,9 @@ enum Error {
     #[snafu(display("Error getting list response body: {}", source))]
     ListResponseBody { source: reqwest::Error },
 
+    #[snafu(display("Got invalid list response: {}", source))]
+    InvalidListResponse { source: quick_xml::de::DeError },
+
     #[snafu(display("Error performing get request {}: {}", path, source))]
     GetRequest {
         source: crate::client::retry::Error,
@@ -143,6 +147,11 @@ enum Error {
 
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
+
+    #[snafu(display("Failed to parse headers: {}", source))]
+    Header {
+        source: crate::client::header::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -162,25 +171,6 @@ impl From<Error> for super::Error {
     }
 }
 
-#[derive(serde::Deserialize, Debug)]
-#[serde(rename_all = "camelCase")]
-struct ListResponse {
-    next_page_token: Option<String>,
-    #[serde(default)]
-    prefixes: Vec<String>,
-    #[serde(default)]
-    items: Vec<Object>,
-}
-
-#[derive(serde::Deserialize, Debug)]
-struct Object {
-    name: String,
-    size: String,
-    updated: DateTime<Utc>,
-    #[serde(rename = "etag")]
-    e_tag: Option<String>,
-}
-
 #[derive(serde::Deserialize, Debug)]
 #[serde(rename_all = "PascalCase")]
 struct InitiateMultipartUploadResult {
@@ -248,15 +238,11 @@ impl GoogleCloudStorageClient {
     }
 
     fn object_url(&self, path: &Path) -> String {
-        let encoded =
-            percent_encoding::utf8_percent_encode(path.as_ref(), 
NON_ALPHANUMERIC);
-        format!(
-            "{}/storage/v1/b/{}/o/{}",
-            self.base_url, self.bucket_name_encoded, encoded
-        )
+        let encoded = utf8_percent_encode(path.as_ref(), NON_ALPHANUMERIC);
+        format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, encoded)
     }
 
-    /// Perform a get request 
<https://cloud.google.com/storage/docs/json_api/v1/objects/get>
+    /// Perform a get request 
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
     async fn get_request(
         &self,
         path: &Path,
@@ -266,16 +252,15 @@ impl GoogleCloudStorageClient {
         let token = self.get_token().await?;
         let url = self.object_url(path);
 
-        let alt = match head {
-            true => "json",
-            false => "media",
+        let method = match head {
+            true => Method::HEAD,
+            false => Method::GET,
         };
 
-        let builder = self.client.request(Method::GET, url);
-
-        let response = builder
+        let response = self
+            .client
+            .request(method, url)
             .bearer_auth(token)
-            .query(&[("alt", alt)])
             .with_get_options(options)
             .send_retry(&self.retry_config)
             .await
@@ -286,13 +271,10 @@ impl GoogleCloudStorageClient {
         Ok(response)
     }
 
-    /// Perform a put request 
<https://cloud.google.com/storage/docs/json_api/v1/objects/insert>
+    /// Perform a put request 
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
     async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
         let token = self.get_token().await?;
-        let url = format!(
-            "{}/upload/storage/v1/b/{}/o",
-            self.base_url, self.bucket_name_encoded
-        );
+        let url = self.object_url(path);
 
         let content_type = self
             .client_options
@@ -300,11 +282,10 @@ impl GoogleCloudStorageClient {
             .unwrap_or("application/octet-stream");
 
         self.client
-            .request(Method::POST, url)
+            .request(Method::PUT, url)
             .bearer_auth(token)
             .header(header::CONTENT_TYPE, content_type)
             .header(header::CONTENT_LENGTH, payload.len())
-            .query(&[("uploadType", "media"), ("name", path.as_ref())])
             .body(payload)
             .send_retry(&self.retry_config)
             .await
@@ -373,7 +354,7 @@ impl GoogleCloudStorageClient {
         Ok(())
     }
 
-    /// Perform a delete request 
<https://cloud.google.com/storage/docs/json_api/v1/objects/delete>
+    /// Perform a delete request 
<https://cloud.google.com/storage/docs/xml-api/delete-object>
     async fn delete_request(&self, path: &Path) -> Result<()> {
         let token = self.get_token().await?;
         let url = self.object_url(path);
@@ -390,7 +371,7 @@ impl GoogleCloudStorageClient {
         Ok(())
     }
 
-    /// Perform a copy request 
<https://cloud.google.com/storage/docs/json_api/v1/objects/copy>
+    /// Perform a copy request 
<https://cloud.google.com/storage/docs/xml-api/put-object-copy>
     async fn copy_request(
         &self,
         from: &Path,
@@ -398,24 +379,18 @@ impl GoogleCloudStorageClient {
         if_not_exists: bool,
     ) -> Result<()> {
         let token = self.get_token().await?;
+        let url = self.object_url(to);
 
-        let source =
-            percent_encoding::utf8_percent_encode(from.as_ref(), 
NON_ALPHANUMERIC);
-        let destination =
-            percent_encoding::utf8_percent_encode(to.as_ref(), 
NON_ALPHANUMERIC);
-        let url = format!(
-            "{}/storage/v1/b/{}/o/{}/copyTo/b/{}/o/{}",
-            self.base_url,
-            self.bucket_name_encoded,
-            source,
-            self.bucket_name_encoded,
-            destination
-        );
+        let from = utf8_percent_encode(from.as_ref(), NON_ALPHANUMERIC);
+        let source = format!("{}/{}", self.bucket_name_encoded, from);
 
-        let mut builder = self.client.request(Method::POST, url);
+        let mut builder = self
+            .client
+            .request(Method::PUT, url)
+            .header("x-goog-copy-source", source);
 
         if if_not_exists {
-            builder = builder.query(&[("ifGenerationMatch", "0")]);
+            builder = builder.header("x-goog-if-generation-match", 0);
         }
 
         builder
@@ -436,7 +411,7 @@ impl GoogleCloudStorageClient {
         Ok(())
     }
 
-    /// Perform a list request 
<https://cloud.google.com/storage/docs/json_api/v1/objects/list>
+    /// Perform a list request 
<https://cloud.google.com/storage/docs/xml-api/get-bucket-list>
     async fn list_request(
         &self,
         prefix: Option<&str>,
@@ -444,13 +419,10 @@ impl GoogleCloudStorageClient {
         page_token: Option<&str>,
     ) -> Result<ListResponse> {
         let token = self.get_token().await?;
+        let url = format!("{}/{}", self.base_url, self.bucket_name_encoded);
 
-        let url = format!(
-            "{}/storage/v1/b/{}/o",
-            self.base_url, self.bucket_name_encoded
-        );
-
-        let mut query = Vec::with_capacity(4);
+        let mut query = Vec::with_capacity(5);
+        query.push(("list-type", "2"));
         if delimiter {
             query.push(("delimiter", DELIMITER))
         }
@@ -460,14 +432,14 @@ impl GoogleCloudStorageClient {
         }
 
         if let Some(page_token) = page_token {
-            query.push(("pageToken", page_token))
+            query.push(("continuation-token", page_token))
         }
 
         if let Some(max_results) = &self.max_list_results {
-            query.push(("maxResults", max_results))
+            query.push(("max-keys", max_results))
         }
 
-        let response: ListResponse = self
+        let response = self
             .client
             .request(Method::GET, url)
             .query(&query)
@@ -475,10 +447,13 @@ impl GoogleCloudStorageClient {
             .send_retry(&self.retry_config)
             .await
             .context(ListRequestSnafu)?
-            .json()
+            .bytes()
             .await
             .context(ListResponseBodySnafu)?;
 
+        let response: ListResponse = 
quick_xml::de::from_reader(response.reader())
+            .context(InvalidListResponseSnafu)?;
+
         Ok(response)
     }
 
@@ -487,14 +462,14 @@ impl GoogleCloudStorageClient {
         &self,
         prefix: Option<&Path>,
         delimiter: bool,
-    ) -> BoxStream<'_, Result<ListResponse>> {
+    ) -> BoxStream<'_, Result<ListResult>> {
         let prefix = format_prefix(prefix);
         stream_paginated(prefix, move |prefix, token| async move {
             let mut r = self
                 .list_request(prefix.as_deref(), delimiter, token.as_deref())
                 .await?;
-            let next_token = r.next_page_token.take();
-            Ok((r, prefix, next_token))
+            let next_token = r.next_continuation_token.take();
+            Ok((r.try_into()?, prefix, next_token))
         })
         .boxed()
     }
@@ -639,12 +614,6 @@ impl ObjectStore for GoogleCloudStorage {
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
-        if options.if_modified_since.is_some() || 
options.if_unmodified_since.is_some() {
-            return Err(super::Error::NotSupported {
-                source: "ModifiedSince Preconditions not supported by 
GoogleCloudStorage JSON API".to_string().into(),
-            });
-        }
-
         let response = self.client.get_request(location, options, 
false).await?;
         let stream = response
             .bytes_stream()
@@ -660,10 +629,7 @@ impl ObjectStore for GoogleCloudStorage {
     async fn head(&self, location: &Path) -> Result<ObjectMeta> {
         let options = GetOptions::default();
         let response = self.client.get_request(location, options, true).await?;
-        let object = response.json().await.context(GetResponseBodySnafu {
-            path: location.as_ref(),
-        })?;
-        convert_object_meta(&object)
+        Ok(header_meta(location, response.headers()).context(HeaderSnafu)?)
     }
 
     async fn delete(&self, location: &Path) -> Result<()> {
@@ -677,11 +643,7 @@ impl ObjectStore for GoogleCloudStorage {
         let stream = self
             .client
             .list_paginated(prefix, false)
-            .map_ok(|r| {
-                futures::stream::iter(
-                    r.items.into_iter().map(|x| convert_object_meta(&x)),
-                )
-            })
+            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
             .try_flatten()
             .boxed();
 
@@ -696,15 +658,8 @@ impl ObjectStore for GoogleCloudStorage {
 
         while let Some(result) = stream.next().await {
             let response = result?;
-
-            for p in response.prefixes {
-                common_prefixes.insert(Path::parse(p)?);
-            }
-
-            objects.reserve(response.items.len());
-            for object in &response.items {
-                objects.push(convert_object_meta(object)?);
-            }
+            common_prefixes.extend(response.common_prefixes.into_iter());
+            objects.extend(response.objects.into_iter());
         }
 
         Ok(ListResult {
@@ -1170,20 +1125,6 @@ impl GoogleCloudStorageBuilder {
     }
 }
 
-fn convert_object_meta(object: &Object) -> Result<ObjectMeta> {
-    let location = Path::parse(&object.name)?;
-    let last_modified = object.updated;
-    let size = object.size.parse().context(InvalidSizeSnafu)?;
-    let e_tag = object.e_tag.clone();
-
-    Ok(ObjectMeta {
-        location,
-        last_modified,
-        size,
-        e_tag,
-    })
-}
-
 #[cfg(test)]
 mod test {
     use bytes::Bytes;
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index ffe509411..39585f73b 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -119,11 +119,7 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.get_range(&full_path, range).await
     }
 
-    async fn get_opts(
-        &self,
-        location: &Path,
-        options: GetOptions,
-    ) -> Result<GetResult> {
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         let full_path = self.full_path(location);
         self.inner.get_opts(&full_path, options).await
     }

Reply via email to