crepererum commented on code in PR #2352:
URL: https://github.com/apache/arrow-rs/pull/2352#discussion_r941013890


##########
object_store/src/client/pagination.rs:
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Result;
+use futures::Stream;
+use std::future::Future;
+
+/// Performs a paginated operation

Review Comment:
   This docstring says close to nothing :grin: 
   
   Maybe add a few words on how a stream is constructed out of a state and an 
async function and what this provides over `unfold` and `poll_fn` (which also 
exists for streams). Esp. that the output is paginated as well, it is NOT 
flattened.
   
   Furthermore, I would suggest renaming `list` to `list_fn` or something, 
because initially I thought `list` is A list -- not a function to list -- that 
somehow gets unrolled here.



##########
object_store/src/aws/credential.rs:
##########
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::retry::RetryExt;
+use crate::client::token::{TemporaryToken, TokenCache};
+use crate::{Result, RetryConfig};
+use bytes::Buf;
+use chrono::{DateTime, Utc};
+use futures::TryFutureExt;
+use reqwest::header::{HeaderMap, HeaderValue};
+use reqwest::{Client, Method, Request, RequestBuilder};
+use serde::Deserialize;
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::time::Instant;
+
+type StdError = Box<dyn std::error::Error + Send + Sync>;
+
+/// SHA256 hash of empty string
+static EMPTY_SHA256_HASH: &str =
+    "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+
+#[derive(Debug)]
+pub struct AwsCredential {
+    pub key_id: String,
+    pub secret_key: String,
+    pub token: Option<String>,
+}
+
+impl AwsCredential {
+    /// Signs a string
+    ///
+    /// 
<https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html>
+    fn sign(
+        &self,
+        to_sign: &str,
+        date: DateTime<Utc>,
+        region: &str,
+        service: &str,
+    ) -> String {
+        let date_string = date.format("%Y%m%d").to_string();
+        let date_hmac = hmac_sha256(format!("AWS4{}", self.secret_key), 
date_string);
+        let region_hmac = hmac_sha256(date_hmac, region);
+        let service_hmac = hmac_sha256(region_hmac, service);
+        let signing_hmac = hmac_sha256(service_hmac, b"aws4_request");
+        hex_encode(hmac_sha256(signing_hmac, to_sign).as_ref())
+    }
+}
+
+struct RequestSigner<'a> {
+    date: DateTime<Utc>,
+    credential: &'a AwsCredential,
+    service: &'a str,
+    region: &'a str,
+}
+
+const DATE_HEADER: &str = "x-amz-date";
+const HASH_HEADER: &str = "x-amz-content-sha256";
+const TOKEN_HEADER: &str = "x-amz-security-token";
+const AUTH_HEADER: &str = "authorization";
+
+const ALL_HEADERS: &[&str; 4] = &[DATE_HEADER, HASH_HEADER, TOKEN_HEADER, 
AUTH_HEADER];
+
+impl<'a> RequestSigner<'a> {
+    fn sign(&self, request: &mut Request) {
+        if let Some(ref token) = self.credential.token {
+            let token_val = HeaderValue::from_str(token).unwrap();
+            request.headers_mut().insert(TOKEN_HEADER, token_val);
+        }
+
+        let host_val = 
HeaderValue::from_str(request.url().host_str().unwrap()).unwrap();
+        request.headers_mut().insert("host", host_val);
+
+        let date_str = self.date.format("%Y%m%dT%H%M%SZ").to_string();
+        let date_val = HeaderValue::from_str(&date_str).unwrap();
+        request.headers_mut().insert(DATE_HEADER, date_val);
+
+        let digest = match request.body() {
+            None => EMPTY_SHA256_HASH.to_string(),
+            Some(body) => hex_digest(body.as_bytes().unwrap()),
+        };
+
+        let header_digest = HeaderValue::from_str(&digest).unwrap();
+        request.headers_mut().insert(HASH_HEADER, header_digest);
+
+        let (signed_headers, canonical_headers) = 
canonicalize_headers(request.headers());
+
+        // 
https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+        let canonical_request = format!(
+            "{}\n{}\n{}\n{}\n{}\n{}",
+            request.method().as_str(),
+            request.url().path(), // S3 doesn't percent encode this like other 
services
+            request.url().query().unwrap_or(""), // This assumes the query 
pairs are in order
+            canonical_headers,
+            signed_headers,
+            digest
+        );
+
+        let hashed_canonical_request = 
hex_digest(canonical_request.as_bytes());
+        let scope = format!(
+            "{}/{}/{}/aws4_request",
+            self.date.format("%Y%m%d"),
+            self.region,
+            self.service
+        );
+
+        let string_to_sign = format!(
+            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
+            self.date.format("%Y%m%dT%H%M%SZ"),
+            scope,
+            hashed_canonical_request
+        );
+
+        // sign the string
+        let signature =
+            self.credential
+                .sign(&string_to_sign, self.date, self.region, self.service);
+
+        // build the actual auth header
+        let authorisation = format!(
+            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, 
Signature={}",
+            self.credential.key_id, scope, signed_headers, signature
+        );
+
+        let authorization_val = HeaderValue::from_str(&authorisation).unwrap();
+        request.headers_mut().insert(AUTH_HEADER, authorization_val);
+    }
+}
+
+pub trait CredentialExt {
+    fn sign(self, credential: &AwsCredential, region: &str, service: &str) -> 
Self;
+}
+
+impl CredentialExt for RequestBuilder {

Review Comment:
   I'm always a bit torn when it comes to extension traits for foreign types. 
Sure the make the code more elegant (I can clearly see that that in 
`client.rs`, which now feels like rather normal `reqwest` client stuff with 
very few parameters) but looking at code that uses is often makes me wonder 
"what's that new/fancy method on this type I've worked with before?!". So you 
can leave it as it is, just don't overdo it :wink: 
   
   Also: they are technically prone to break under semantic versioning, because 
upstream may be free to add new methods (not a breaking change) which then 
collide with this method name. Since your "use side" doesn't use a fully 
qualified name, the code will stop compiling due to ambiguity.
   
   One thing to avoid both (my confusion and the potential breakage) would be 
to choose a more specific method name like `aws_sign` or `s3_sign`.



##########
object_store/src/aws/client.rs:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
+use crate::client::pagination::paginated;
+use crate::client::retry::RetryExt;
+use crate::multipart::UploadPart;
+use crate::path::DELIMITER;
+use crate::util::{format_http_range, format_prefix};
+use crate::{
+    BoxStream, ListResult, MultipartId, ObjectMeta, Path, Result, RetryConfig, 
StreamExt,
+};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, Utc};
+use percent_encoding::{utf8_percent_encode, AsciiSet, PercentEncode, 
NON_ALPHANUMERIC};
+use reqwest::{Client, Method, Response, StatusCode};

Review Comment:
   ```suggestion
   use reqwest::{Client as ReqwestClient, Method, Response, StatusCode};
   ```



##########
object_store/src/aws/credential.rs:
##########
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::retry::RetryExt;
+use crate::client::token::{TemporaryToken, TokenCache};
+use crate::{Result, RetryConfig};
+use bytes::Buf;
+use chrono::{DateTime, Utc};
+use futures::TryFutureExt;
+use reqwest::header::{HeaderMap, HeaderValue};
+use reqwest::{Client, Method, Request, RequestBuilder};
+use serde::Deserialize;
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::time::Instant;
+
+type StdError = Box<dyn std::error::Error + Send + Sync>;
+
+/// SHA256 hash of empty string
+static EMPTY_SHA256_HASH: &str =
+    "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+
+#[derive(Debug)]
+pub struct AwsCredential {
+    pub key_id: String,
+    pub secret_key: String,
+    pub token: Option<String>,
+}
+
+impl AwsCredential {
+    /// Signs a string
+    ///
+    /// 
<https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html>
+    fn sign(
+        &self,
+        to_sign: &str,
+        date: DateTime<Utc>,
+        region: &str,
+        service: &str,
+    ) -> String {
+        let date_string = date.format("%Y%m%d").to_string();
+        let date_hmac = hmac_sha256(format!("AWS4{}", self.secret_key), 
date_string);
+        let region_hmac = hmac_sha256(date_hmac, region);
+        let service_hmac = hmac_sha256(region_hmac, service);
+        let signing_hmac = hmac_sha256(service_hmac, b"aws4_request");
+        hex_encode(hmac_sha256(signing_hmac, to_sign).as_ref())
+    }
+}
+
+struct RequestSigner<'a> {
+    date: DateTime<Utc>,
+    credential: &'a AwsCredential,
+    service: &'a str,
+    region: &'a str,
+}
+
+const DATE_HEADER: &str = "x-amz-date";
+const HASH_HEADER: &str = "x-amz-content-sha256";
+const TOKEN_HEADER: &str = "x-amz-security-token";
+const AUTH_HEADER: &str = "authorization";
+
+const ALL_HEADERS: &[&str; 4] = &[DATE_HEADER, HASH_HEADER, TOKEN_HEADER, 
AUTH_HEADER];
+
+impl<'a> RequestSigner<'a> {
+    fn sign(&self, request: &mut Request) {
+        if let Some(ref token) = self.credential.token {
+            let token_val = HeaderValue::from_str(token).unwrap();
+            request.headers_mut().insert(TOKEN_HEADER, token_val);
+        }
+
+        let host_val = 
HeaderValue::from_str(request.url().host_str().unwrap()).unwrap();
+        request.headers_mut().insert("host", host_val);
+
+        let date_str = self.date.format("%Y%m%dT%H%M%SZ").to_string();
+        let date_val = HeaderValue::from_str(&date_str).unwrap();
+        request.headers_mut().insert(DATE_HEADER, date_val);
+
+        let digest = match request.body() {
+            None => EMPTY_SHA256_HASH.to_string(),
+            Some(body) => hex_digest(body.as_bytes().unwrap()),
+        };
+
+        let header_digest = HeaderValue::from_str(&digest).unwrap();
+        request.headers_mut().insert(HASH_HEADER, header_digest);
+
+        let (signed_headers, canonical_headers) = 
canonicalize_headers(request.headers());
+
+        // 
https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+        let canonical_request = format!(
+            "{}\n{}\n{}\n{}\n{}\n{}",
+            request.method().as_str(),
+            request.url().path(), // S3 doesn't percent encode this like other 
services
+            request.url().query().unwrap_or(""), // This assumes the query 
pairs are in order
+            canonical_headers,
+            signed_headers,
+            digest
+        );
+
+        let hashed_canonical_request = 
hex_digest(canonical_request.as_bytes());
+        let scope = format!(
+            "{}/{}/{}/aws4_request",
+            self.date.format("%Y%m%d"),
+            self.region,
+            self.service
+        );
+
+        let string_to_sign = format!(
+            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
+            self.date.format("%Y%m%dT%H%M%SZ"),
+            scope,
+            hashed_canonical_request
+        );
+
+        // sign the string
+        let signature =
+            self.credential
+                .sign(&string_to_sign, self.date, self.region, self.service);
+
+        // build the actual auth header
+        let authorisation = format!(
+            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, 
Signature={}",
+            self.credential.key_id, scope, signed_headers, signature
+        );
+
+        let authorization_val = HeaderValue::from_str(&authorisation).unwrap();
+        request.headers_mut().insert(AUTH_HEADER, authorization_val);
+    }
+}
+
+pub trait CredentialExt {
+    fn sign(self, credential: &AwsCredential, region: &str, service: &str) -> 
Self;
+}
+
+impl CredentialExt for RequestBuilder {
+    /// Sign a request 
<https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html>

Review Comment:
   This doc should go the the trait, not the impl (or both if the the impl is 
special).



##########
object_store/src/aws/mod.rs:
##########
@@ -0,0 +1,631 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for S3
+//!
+//! ## Multi-part uploads
+//!
+//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] 
method.
+//! Data passed to the writer is automatically buffered to meet the minimum 
size
+//! requirements for a part. Multiple parts are uploaded concurrently.
+//!
+//! If the writer fails for any reason, you may have parts uploaded to AWS but 
not
+//! used that you may be charged for. Use the [ObjectStore::abort_multipart] 
method
+//! to abort the upload and drop those unneeded parts. In addition, you may 
wish to
+//! consider implementing [automatic cleanup] of unused parts that are older 
than one
+//! week.
+//!
+//! [automatic cleanup]: 
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::stream::BoxStream;
+use futures::TryStreamExt;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::collections::BTreeSet;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::io::AsyncWrite;
+use tracing::info;
+
+use crate::aws::client::{S3Client, S3Config};
+use crate::aws::credential::{AwsCredential, CredentialProvider};
+use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, 
UploadPart};
+use crate::{
+    GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result,
+    RetryConfig, StreamExt,
+};
+
+mod client;
+mod credential;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+    #[snafu(display("Last-Modified Header missing from response"))]
+    MissingLastModified,
+
+    #[snafu(display("Content-Length Header missing from response"))]
+    MissingContentLength,
+
+    #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
+    InvalidLastModified {
+        last_modified: String,
+        source: chrono::ParseError,
+    },
+
+    #[snafu(display("Invalid content length '{}': {}", content_length, 
source))]
+    InvalidContentLength {
+        content_length: String,
+        source: std::num::ParseIntError,
+    },
+
+    #[snafu(display("Missing region"))]
+    MissingRegion,
+
+    #[snafu(display("Missing bucket name"))]
+    MissingBucketName,
+
+    #[snafu(display("Missing AccessKeyId"))]
+    MissingAccessKeyId,
+
+    #[snafu(display("Missing SecretAccessKey"))]
+    MissingSecretAccessKey,
+
+    #[snafu(display("ETag Header missing from response"))]
+    MissingEtag,
+
+    #[snafu(display("Received header containing non-ASCII data"))]
+    BadHeader { source: reqwest::header::ToStrError },
+
+    #[snafu(display("Error reading token file: {}", source))]
+    ReadTokenFile { source: std::io::Error },
+}
+
+impl From<Error> for super::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "S3",
+            source: Box::new(err),
+        }
+    }
+}
+
+/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
+#[derive(Debug)]
+pub struct AmazonS3 {
+    client: Arc<S3Client>,

Review Comment:
   What's the reason that the `S3Client` is not just inlined into the store 
object? Feels a bit like a layer that isn't really necessary.



##########
object_store/src/aws/client.rs:
##########
@@ -0,0 +1,486 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
+use crate::client::pagination::paginated;
+use crate::client::retry::RetryExt;
+use crate::multipart::UploadPart;
+use crate::path::DELIMITER;
+use crate::util::{format_http_range, format_prefix};
+use crate::{
+    BoxStream, ListResult, MultipartId, ObjectMeta, Path, Result, RetryConfig, 
StreamExt,
+};
+use bytes::{Buf, Bytes};
+use chrono::{DateTime, Utc};
+use percent_encoding::{utf8_percent_encode, AsciiSet, PercentEncode, 
NON_ALPHANUMERIC};
+use reqwest::{Client, Method, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+use snafu::{ResultExt, Snafu};
+use std::ops::Range;
+use std::sync::Arc;
+
+// 
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+//
+// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
+// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ 
).
+const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
+    .remove(b'-')
+    .remove(b'.')
+    .remove(b'_')
+    .remove(b'~');
+
+/// This struct is used to maintain the URI path encoding
+const STRICT_PATH_ENCODE_SET: AsciiSet = STRICT_ENCODE_SET.remove(b'/');
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+pub(crate) enum Error {
+    #[snafu(display("Error performing get request {}: {}", path, source))]
+    GetRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing put request {}: {}", path, source))]
+    PutRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing delete request {}: {}", path, source))]
+    DeleteRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing copy request {}: {}", path, source))]
+    CopyRequest {
+        source: reqwest::Error,
+        path: String,
+    },
+
+    #[snafu(display("Error performing list request: {}", source))]
+    ListRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing create multipart request: {}", source))]
+    CreateMultipartRequest { source: reqwest::Error },
+
+    #[snafu(display("Error performing complete multipart request: {}", 
source))]
+    CompleteMultipartRequest { source: reqwest::Error },
+
+    #[snafu(display("Got invalid list response: {}", source))]
+    InvalidListResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Got invalid multipart response: {}", source))]
+    InvalidMultipartResponse { source: quick_xml::de::DeError },
+}
+
+impl From<Error> for crate::Error {
+    fn from(err: Error) -> Self {
+        match err {
+            Error::GetRequest { source, path }
+            | Error::DeleteRequest { source, path }
+            | Error::CopyRequest { source, path }
+            | Error::PutRequest { source, path }
+                if matches!(source.status(), Some(StatusCode::NOT_FOUND)) =>
+            {
+                Self::NotFound {
+                    path,
+                    source: Box::new(source),
+                }
+            }
+            _ => Self::Generic {
+                store: "S3",
+                source: Box::new(err),
+            },
+        }
+    }
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListResponse {
+    #[serde(default)]
+    pub contents: Vec<ListContents>,
+    #[serde(default)]
+    pub common_prefixes: Vec<ListPrefix>,
+    #[serde(default)]
+    pub next_continuation_token: Option<String>,
+}
+
+impl TryFrom<ListResponse> for ListResult {
+    type Error = crate::Error;
+
+    fn try_from(value: ListResponse) -> Result<Self> {
+        let common_prefixes = value
+            .common_prefixes
+            .into_iter()
+            .map(|x| Ok(Path::parse(&x.prefix)?))
+            .collect::<Result<_>>()?;
+
+        let objects = value
+            .contents
+            .into_iter()
+            .map(TryFrom::try_from)
+            .collect::<Result<_>>()?;
+
+        Ok(Self {
+            common_prefixes,
+            objects,
+        })
+    }
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListPrefix {
+    pub prefix: String,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct ListContents {
+    pub key: String,
+    pub size: usize,
+    pub last_modified: DateTime<Utc>,
+}
+
+impl TryFrom<ListContents> for ObjectMeta {
+    type Error = crate::Error;
+
+    fn try_from(value: ListContents) -> Result<Self> {
+        Ok(Self {
+            location: Path::parse(value.key)?,
+            last_modified: value.last_modified,
+            size: value.size,
+        })
+    }
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+struct InitiateMultipart {
+    upload_id: String,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUpload")]
+struct CompleteMultipart {
+    part: Vec<MultipartPart>,
+}
+
+#[derive(Debug, Serialize)]
+struct MultipartPart {
+    #[serde(rename = "$unflatten=ETag")]
+    e_tag: String,
+    #[serde(rename = "$unflatten=PartNumber")]
+    part_number: usize,
+}
+
+#[derive(Debug)]
+pub struct S3Config {
+    pub region: String,
+    pub endpoint: String,
+    pub bucket: String,
+    pub credentials: CredentialProvider,
+    pub retry_config: RetryConfig,
+    pub allow_http: bool,
+}
+
+impl S3Config {
+    fn path_url(&self, path: &Path) -> String {
+        format!("{}/{}/{}", self.endpoint, self.bucket, encode_path(path))
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct S3Client {
+    config: S3Config,
+    client: Client,

Review Comment:
   ```suggestion
       reqwest_client: ReqwestClient,
   ```
   
   I was wondering what the client in a client in a client was.



##########
object_store/src/aws/credential.rs:
##########
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   This module makes me a bit unhappy. It's such a PITA that AWS requires all 
that stuff. However I see you clearly did your homework and it's documented and 
the tests work, so I just assume it does the right thing.
   
   If you want me to double-check the code with the official docs, I can do 
that but I think it's not really required.



##########
object_store/src/aws/credential.rs:
##########
@@ -0,0 +1,526 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::retry::RetryExt;
+use crate::client::token::{TemporaryToken, TokenCache};
+use crate::{Result, RetryConfig};
+use bytes::Buf;
+use chrono::{DateTime, Utc};
+use futures::TryFutureExt;
+use reqwest::header::{HeaderMap, HeaderValue};
+use reqwest::{Client, Method, Request, RequestBuilder};
+use serde::Deserialize;
+use std::collections::BTreeMap;
+use std::sync::Arc;
+use std::time::Instant;
+
+type StdError = Box<dyn std::error::Error + Send + Sync>;
+
+/// SHA256 hash of empty string
+static EMPTY_SHA256_HASH: &str =
+    "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
+
+#[derive(Debug)]
+pub struct AwsCredential {
+    pub key_id: String,
+    pub secret_key: String,
+    pub token: Option<String>,
+}
+
+impl AwsCredential {
+    /// Signs a string
+    ///
+    /// 
<https://docs.aws.amazon.com/general/latest/gr/sigv4-calculate-signature.html>
+    fn sign(
+        &self,
+        to_sign: &str,
+        date: DateTime<Utc>,
+        region: &str,
+        service: &str,
+    ) -> String {
+        let date_string = date.format("%Y%m%d").to_string();
+        let date_hmac = hmac_sha256(format!("AWS4{}", self.secret_key), 
date_string);
+        let region_hmac = hmac_sha256(date_hmac, region);
+        let service_hmac = hmac_sha256(region_hmac, service);
+        let signing_hmac = hmac_sha256(service_hmac, b"aws4_request");
+        hex_encode(hmac_sha256(signing_hmac, to_sign).as_ref())
+    }
+}
+
+struct RequestSigner<'a> {
+    date: DateTime<Utc>,
+    credential: &'a AwsCredential,
+    service: &'a str,
+    region: &'a str,
+}
+
+const DATE_HEADER: &str = "x-amz-date";
+const HASH_HEADER: &str = "x-amz-content-sha256";
+const TOKEN_HEADER: &str = "x-amz-security-token";
+const AUTH_HEADER: &str = "authorization";
+
+const ALL_HEADERS: &[&str; 4] = &[DATE_HEADER, HASH_HEADER, TOKEN_HEADER, 
AUTH_HEADER];
+
+impl<'a> RequestSigner<'a> {
+    fn sign(&self, request: &mut Request) {
+        if let Some(ref token) = self.credential.token {
+            let token_val = HeaderValue::from_str(token).unwrap();
+            request.headers_mut().insert(TOKEN_HEADER, token_val);
+        }
+
+        let host_val = 
HeaderValue::from_str(request.url().host_str().unwrap()).unwrap();
+        request.headers_mut().insert("host", host_val);
+
+        let date_str = self.date.format("%Y%m%dT%H%M%SZ").to_string();
+        let date_val = HeaderValue::from_str(&date_str).unwrap();
+        request.headers_mut().insert(DATE_HEADER, date_val);
+
+        let digest = match request.body() {
+            None => EMPTY_SHA256_HASH.to_string(),
+            Some(body) => hex_digest(body.as_bytes().unwrap()),
+        };
+
+        let header_digest = HeaderValue::from_str(&digest).unwrap();
+        request.headers_mut().insert(HASH_HEADER, header_digest);
+
+        let (signed_headers, canonical_headers) = 
canonicalize_headers(request.headers());
+
+        // 
https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
+        let canonical_request = format!(
+            "{}\n{}\n{}\n{}\n{}\n{}",
+            request.method().as_str(),
+            request.url().path(), // S3 doesn't percent encode this like other 
services
+            request.url().query().unwrap_or(""), // This assumes the query 
pairs are in order
+            canonical_headers,
+            signed_headers,
+            digest
+        );
+
+        let hashed_canonical_request = 
hex_digest(canonical_request.as_bytes());
+        let scope = format!(
+            "{}/{}/{}/aws4_request",
+            self.date.format("%Y%m%d"),
+            self.region,
+            self.service
+        );
+
+        let string_to_sign = format!(
+            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
+            self.date.format("%Y%m%dT%H%M%SZ"),
+            scope,
+            hashed_canonical_request
+        );
+
+        // sign the string
+        let signature =
+            self.credential
+                .sign(&string_to_sign, self.date, self.region, self.service);
+
+        // build the actual auth header
+        let authorisation = format!(
+            "AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, 
Signature={}",
+            self.credential.key_id, scope, signed_headers, signature
+        );
+
+        let authorization_val = HeaderValue::from_str(&authorisation).unwrap();
+        request.headers_mut().insert(AUTH_HEADER, authorization_val);
+    }
+}
+
+pub trait CredentialExt {
+    fn sign(self, credential: &AwsCredential, region: &str, service: &str) -> 
Self;
+}
+
+impl CredentialExt for RequestBuilder {
+    /// Sign a request 
<https://docs.aws.amazon.com/general/latest/gr/sigv4_signing.html>
+    fn sign(mut self, credential: &AwsCredential, region: &str, service: &str) 
-> Self {
+        // Hack around lack of access to underlying request
+        // https://github.com/seanmonstar/reqwest/issues/1212
+        let mut request = self
+            .try_clone()
+            .expect("not stream")
+            .build()
+            .expect("request valid");
+
+        let date = Utc::now();
+        let signer = RequestSigner {
+            date,
+            credential,
+            service,
+            region,
+        };
+
+        signer.sign(&mut request);
+
+        for header in ALL_HEADERS {
+            if let Some(val) = request.headers_mut().remove(*header) {
+                self = self.header(*header, val)
+            }
+        }
+        self
+    }
+}
+
+fn hmac_sha256(secret: impl AsRef<[u8]>, bytes: impl AsRef<[u8]>) -> 
ring::hmac::Tag {
+    let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, secret.as_ref());
+    ring::hmac::sign(&key, bytes.as_ref())
+}
+
+/// Computes the SHA256 digest of `body` returned as a hex encoded string
+fn hex_digest(bytes: &[u8]) -> String {
+    let digest = ring::digest::digest(&ring::digest::SHA256, bytes);
+    hex_encode(digest.as_ref())
+}
+
+/// Returns `bytes` as a lower-case hex encoded string
+fn hex_encode(bytes: &[u8]) -> String {
+    use std::fmt::Write;
+    let mut out = String::with_capacity(bytes.len() * 2);
+    for byte in bytes {
+        // String writing is infallible
+        let _ = write!(out, "{:02x}", byte);
+    }
+    out
+}
+
+/// Canonicalizes headers into the AWS Canonical Form.
+///
+/// 
<http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>

Review Comment:
   ```suggestion
   /// 
<https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>
   ```



##########
object_store/src/aws/mod.rs:
##########
@@ -0,0 +1,631 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store implementation for S3
+//!
+//! ## Multi-part uploads
+//!
+//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] 
method.
+//! Data passed to the writer is automatically buffered to meet the minimum 
size
+//! requirements for a part. Multiple parts are uploaded concurrently.
+//!
+//! If the writer fails for any reason, you may have parts uploaded to AWS but 
not
+//! used that you may be charged for. Use the [ObjectStore::abort_multipart] 
method
+//! to abort the upload and drop those unneeded parts. In addition, you may 
wish to
+//! consider implementing [automatic cleanup] of unused parts that are older 
than one
+//! week.
+//!
+//! [automatic cleanup]: 
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
+
+use async_trait::async_trait;
+use bytes::Bytes;
+use chrono::{DateTime, Utc};
+use futures::stream::BoxStream;
+use futures::TryStreamExt;
+use snafu::{OptionExt, ResultExt, Snafu};
+use std::collections::BTreeSet;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::io::AsyncWrite;
+use tracing::info;
+
+use crate::aws::client::{S3Client, S3Config};
+use crate::aws::credential::{AwsCredential, CredentialProvider};
+use crate::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, 
UploadPart};
+use crate::{
+    GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, Result,
+    RetryConfig, StreamExt,
+};
+
+mod client;
+mod credential;
+
+/// A specialized `Error` for object store-related errors
+#[derive(Debug, Snafu)]
+#[allow(missing_docs)]
+enum Error {
+    #[snafu(display("Last-Modified Header missing from response"))]
+    MissingLastModified,
+
+    #[snafu(display("Content-Length Header missing from response"))]
+    MissingContentLength,
+
+    #[snafu(display("Invalid last modified '{}': {}", last_modified, source))]
+    InvalidLastModified {
+        last_modified: String,
+        source: chrono::ParseError,
+    },
+
+    #[snafu(display("Invalid content length '{}': {}", content_length, 
source))]
+    InvalidContentLength {
+        content_length: String,
+        source: std::num::ParseIntError,
+    },
+
+    #[snafu(display("Missing region"))]
+    MissingRegion,
+
+    #[snafu(display("Missing bucket name"))]
+    MissingBucketName,
+
+    #[snafu(display("Missing AccessKeyId"))]
+    MissingAccessKeyId,
+
+    #[snafu(display("Missing SecretAccessKey"))]
+    MissingSecretAccessKey,
+
+    #[snafu(display("ETag Header missing from response"))]
+    MissingEtag,
+
+    #[snafu(display("Received header containing non-ASCII data"))]
+    BadHeader { source: reqwest::header::ToStrError },
+
+    #[snafu(display("Error reading token file: {}", source))]
+    ReadTokenFile { source: std::io::Error },
+}
+
+impl From<Error> for super::Error {
+    fn from(err: Error) -> Self {
+        Self::Generic {
+            store: "S3",
+            source: Box::new(err),
+        }

Review Comment:
   Shouldn't there be at least a `NotFound` error (and the generic test suite 
should probably assert that fetching an unknown object actually results in an 
"not found" error, not some generic one).



##########
.github/workflows/object_store.yml:
##########
@@ -59,6 +59,13 @@ jobs:
         image: localstack/localstack:0.14.4
         ports:
           - 4566:4566
+      ec2-metadata:

Review Comment:
   Why do we need EC2 now? Is this to test that EC2 VMs could fetch S3 
credentials using yet-another-mechanism[^1]?
   
   [^1]: That's AWS blaming, it's not against this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to