This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b0d64b6279 Adds send_retry_with_idempotency and retry more kinds of 
transport errors (#5609)
0b0d64b6279 is described below

commit 0b0d64b6279ac753ec91e32a12b9ee711b6fea29
Author: AndrĂ© Guedes <[email protected]>
AuthorDate: Tue Apr 9 07:43:18 2024 -0300

    Adds send_retry_with_idempotency and retry more kinds of transport errors 
(#5609)
---
 object_store/src/aws/client.rs       |  17 ++-
 object_store/src/aws/credential.rs   |   4 +-
 object_store/src/aws/dynamo.rs       |   6 +-
 object_store/src/aws/mod.rs          |   8 +-
 object_store/src/azure/client.rs     |  17 ++-
 object_store/src/azure/credential.rs |   4 +-
 object_store/src/client/retry.rs     | 269 ++++++++++++++++++++---------------
 object_store/src/gcp/client.rs       |  32 +++--
 object_store/src/gcp/credential.rs   |  18 +--
 object_store/src/http/client.rs      |   2 +-
 10 files changed, 233 insertions(+), 144 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 4d101456fd1..838bef8ac23 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -268,6 +268,7 @@ pub(crate) struct Request<'a> {
     builder: RequestBuilder,
     payload_sha256: Option<Vec<u8>>,
     use_session_creds: bool,
+    idempotent: bool,
 }
 
 impl<'a> Request<'a> {
@@ -285,6 +286,11 @@ impl<'a> Request<'a> {
         Self { builder, ..self }
     }
 
+    pub fn set_idempotent(mut self, idempotent: bool) -> Self {
+        self.idempotent = idempotent;
+        self
+    }
+
     pub async fn send(self) -> Result<Response, RequestError> {
         let credential = match self.use_session_creds {
             true => self.config.get_session_credential().await?,
@@ -298,7 +304,7 @@ impl<'a> Request<'a> {
         let path = self.path.as_ref();
         self.builder
             .with_aws_sigv4(credential.authorizer(), 
self.payload_sha256.as_deref())
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, 
self.idempotent)
             .await
             .context(RetrySnafu { path })
     }
@@ -360,6 +366,7 @@ impl S3Client {
             payload_sha256,
             config: &self.config,
             use_session_creds: true,
+            idempotent: false,
         }
     }
 
@@ -462,7 +469,7 @@ impl S3Client {
             .header(CONTENT_TYPE, "application/xml")
             .body(body)
             .with_aws_sigv4(credential.authorizer(), payload_sha256.as_deref())
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, false)
             .await
             .context(DeleteObjectsRequestSnafu {})?
             .bytes()
@@ -510,6 +517,7 @@ impl S3Client {
             config: &self.config,
             payload_sha256: None,
             use_session_creds: false,
+            idempotent: false,
         }
     }
 
@@ -522,7 +530,7 @@ impl S3Client {
             .request(Method::POST, url)
             .headers(self.config.encryption_headers.clone().into())
             .with_aws_sigv4(credential.authorizer(), None)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .context(CreateMultipartRequestSnafu)?
             .bytes()
@@ -547,6 +555,7 @@ impl S3Client {
         let response = self
             .put_request(path, data, false)
             .query(&[("partNumber", &part), ("uploadId", upload_id)])
+            .set_idempotent(true)
             .send()
             .await?;
 
@@ -582,7 +591,7 @@ impl S3Client {
             .query(&[("uploadId", upload_id)])
             .body(body)
             .with_aws_sigv4(credential.authorizer(), None)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .context(CompleteMultipartRequestSnafu)?;
 
diff --git a/object_store/src/aws/credential.rs 
b/object_store/src/aws/credential.rs
index 478e56dd09c..a7d1a9772aa 100644
--- a/object_store/src/aws/credential.rs
+++ b/object_store/src/aws/credential.rs
@@ -517,7 +517,7 @@ async fn instance_creds(
     let token_result = client
         .request(Method::PUT, token_url)
         .header("X-aws-ec2-metadata-token-ttl-seconds", "600") // 10 minute TTL
-        .send_retry(retry_config)
+        .send_retry_with_idempotency(retry_config, true)
         .await;
 
     let token = match token_result {
@@ -607,7 +607,7 @@ async fn web_identity(
             ("Version", "2011-06-15"),
             ("WebIdentityToken", &token),
         ])
-        .send_retry(retry_config)
+        .send_retry_with_idempotency(retry_config, true)
         .await?
         .bytes()
         .await?;
diff --git a/object_store/src/aws/dynamo.rs b/object_store/src/aws/dynamo.rs
index 2390187e7f7..2e60bbad222 100644
--- a/object_store/src/aws/dynamo.rs
+++ b/object_store/src/aws/dynamo.rs
@@ -186,7 +186,11 @@ impl DynamoCommit {
         to: &Path,
     ) -> Result<()> {
         self.conditional_op(client, to, None, || async {
-            client.copy_request(from, to).send().await?;
+            client
+                .copy_request(from, to)
+                .set_idempotent(false)
+                .send()
+                .await?;
             Ok(())
         })
         .await
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 76d01d59704..16af4d3b410 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -159,7 +159,7 @@ impl ObjectStore for AmazonS3 {
         }
 
         match (opts.mode, &self.client.config.conditional_put) {
-            (PutMode::Overwrite, _) => request.do_put().await,
+            (PutMode::Overwrite, _) => 
request.set_idempotent(true).do_put().await,
             (PutMode::Create | PutMode::Update(_), None) => 
Err(Error::NotImplemented),
             (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
                 match request.header(&IF_NONE_MATCH, "*").do_put().await {
@@ -268,7 +268,11 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to).send().await?;
+        self.client
+            .copy_request(from, to)
+            .set_idempotent(true)
+            .send()
+            .await?;
         Ok(())
     }
 
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 5be6658beff..0e6af50fbf9 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -172,6 +172,7 @@ struct PutRequest<'a> {
     path: &'a Path,
     config: &'a AzureConfig,
     builder: RequestBuilder,
+    idempotent: bool,
 }
 
 impl<'a> PutRequest<'a> {
@@ -185,12 +186,17 @@ impl<'a> PutRequest<'a> {
         Self { builder, ..self }
     }
 
+    fn set_idempotent(mut self, idempotent: bool) -> Self {
+        self.idempotent = idempotent;
+        self
+    }
+
     async fn send(self) -> Result<Response> {
         let credential = self.config.get_credential().await?;
         let response = self
             .builder
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, 
self.idempotent)
             .await
             .context(PutRequestSnafu {
                 path: self.path.as_ref(),
@@ -239,6 +245,7 @@ impl AzureClient {
             path,
             builder,
             config: &self.config,
+            idempotent: false,
         }
     }
 
@@ -247,7 +254,7 @@ impl AzureClient {
         let builder = self.put_request(path, bytes);
 
         let builder = match &opts.mode {
-            PutMode::Overwrite => builder,
+            PutMode::Overwrite => builder.set_idempotent(true),
             PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
             PutMode::Update(v) => {
                 let etag = v.e_tag.as_ref().context(MissingETagSnafu)?;
@@ -271,6 +278,7 @@ impl AzureClient {
 
         self.put_request(path, data)
             .query(&[("comp", "block"), ("blockid", &block_id)])
+            .set_idempotent(true)
             .send()
             .await?;
 
@@ -287,6 +295,7 @@ impl AzureClient {
         let response = self
             .put_request(path, BlockList { blocks }.to_xml().into())
             .query(&[("comp", "blocklist")])
+            .set_idempotent(true)
             .send()
             .await?;
 
@@ -340,7 +349,7 @@ impl AzureClient {
 
         builder
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .map_err(|err| err.error(STORE, from.to_string()))?;
 
@@ -373,7 +382,7 @@ impl AzureClient {
             .body(body)
             .query(&[("restype", "service"), ("comp", "userdelegationkey")])
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .context(DelegationKeyRequestSnafu)?
             .bytes()
diff --git a/object_store/src/azure/credential.rs 
b/object_store/src/azure/credential.rs
index 6dc3141b08c..36845bd1d64 100644
--- a/object_store/src/azure/credential.rs
+++ b/object_store/src/azure/credential.rs
@@ -615,7 +615,7 @@ impl TokenProvider for ClientSecretOAuthProvider {
                 ("scope", AZURE_STORAGE_SCOPE),
                 ("grant_type", "client_credentials"),
             ])
-            .send_retry(retry)
+            .send_retry_with_idempotency(retry, true)
             .await
             .context(TokenRequestSnafu)?
             .json()
@@ -797,7 +797,7 @@ impl TokenProvider for WorkloadIdentityOAuthProvider {
                 ("scope", AZURE_STORAGE_SCOPE),
                 ("grant_type", "client_credentials"),
             ])
-            .send_retry(retry)
+            .send_retry_with_idempotency(retry, true)
             .await
             .context(TokenRequestSnafu)?
             .json()
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index e4bb5c9e731..f3fa7153e93 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -166,128 +166,83 @@ impl Default for RetryConfig {
     }
 }
 
-pub trait RetryExt {
-    /// Dispatch a request with the given retry configuration
-    ///
-    /// # Panic
-    ///
-    /// This will panic if the request body is a stream
-    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>>;
-}
-
-impl RetryExt for reqwest::RequestBuilder {
-    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>> {
-        let mut backoff = Backoff::new(&config.backoff);
-        let max_retries = config.max_retries;
-        let retry_timeout = config.retry_timeout;
-
-        let (client, req) = self.build_split();
-        let req = req.expect("request must be valid");
-
-        async move {
-            let mut retries = 0;
-            let now = Instant::now();
-
-            loop {
-                let s = req.try_clone().expect("request body must be 
cloneable");
-                match client.execute(s).await {
-                    Ok(r) => match r.error_for_status_ref() {
-                        Ok(_) if r.status().is_success() => return Ok(r),
-                        Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
-                            return Err(Error::Client {
+fn send_retry_impl(
+    builder: reqwest::RequestBuilder,
+    config: &RetryConfig,
+    is_idempotent: Option<bool>,
+) -> BoxFuture<'static, Result<Response>> {
+    let mut backoff = Backoff::new(&config.backoff);
+    let max_retries = config.max_retries;
+    let retry_timeout = config.retry_timeout;
+
+    let (client, req) = builder.build_split();
+    let req = req.expect("request must be valid");
+    let is_idempotent = is_idempotent.unwrap_or(req.method().is_safe());
+
+    async move {
+        let mut retries = 0;
+        let now = Instant::now();
+
+        loop {
+            let s = req.try_clone().expect("request body must be cloneable");
+            match client.execute(s).await {
+                Ok(r) => match r.error_for_status_ref() {
+                    Ok(_) if r.status().is_success() => return Ok(r),
+                    Ok(r) if r.status() == StatusCode::NOT_MODIFIED => {
+                        return Err(Error::Client {
+                            body: None,
+                            status: StatusCode::NOT_MODIFIED,
+                        })
+                    }
+                    Ok(r) => {
+                        let is_bare_redirect = r.status().is_redirection() && 
!r.headers().contains_key(LOCATION);
+                        return match is_bare_redirect {
+                            true => Err(Error::BareRedirect),
+                            // Not actually sure if this is reachable, but 
here for completeness
+                            false => Err(Error::Client {
                                 body: None,
-                                status: StatusCode::NOT_MODIFIED,
+                                status: r.status(),
                             })
                         }
-                        Ok(r) => {
-                            let is_bare_redirect = r.status().is_redirection() 
&& !r.headers().contains_key(LOCATION);
-                            return match is_bare_redirect {
-                                true => Err(Error::BareRedirect),
-                                // Not actually sure if this is reachable, but 
here for completeness
-                                false => Err(Error::Client {
-                                    body: None,
-                                    status: r.status(),
-                                })
-                            }
-                        }
-                        Err(e) => {
-                            let status = r.status();
-                            if retries == max_retries
-                                || now.elapsed() > retry_timeout
-                                || !status.is_server_error() {
-
-                                return Err(match status.is_client_error() {
-                                    true => match r.text().await {
-                                        Ok(body) => {
-                                            Error::Client {
-                                                body: Some(body).filter(|b| 
!b.is_empty()),
-                                                status,
-                                            }
-                                        }
-                                        Err(e) => {
-                                            Error::Reqwest {
-                                                retries,
-                                                max_retries,
-                                                elapsed: now.elapsed(),
-                                                retry_timeout,
-                                                source: e,
-                                            }
+                    }
+                    Err(e) => {
+                        let status = r.status();
+                        if retries == max_retries
+                            || now.elapsed() > retry_timeout
+                            || !status.is_server_error() {
+
+                            return Err(match status.is_client_error() {
+                                true => match r.text().await {
+                                    Ok(body) => {
+                                        Error::Client {
+                                            body: Some(body).filter(|b| 
!b.is_empty()),
+                                            status,
                                         }
                                     }
-                                    false => Error::Reqwest {
-                                        retries,
-                                        max_retries,
-                                        elapsed: now.elapsed(),
-                                        retry_timeout,
-                                        source: e,
+                                    Err(e) => {
+                                        Error::Reqwest {
+                                            retries,
+                                            max_retries,
+                                            elapsed: now.elapsed(),
+                                            retry_timeout,
+                                            source: e,
+                                        }
                                     }
-                                });
-                            }
-
-                            let sleep = backoff.next();
-                            retries += 1;
-                            info!(
-                                "Encountered server error, backing off for {} 
seconds, retry {} of {}: {}",
-                                sleep.as_secs_f32(),
-                                retries,
-                                max_retries,
-                                e,
-                            );
-                            tokio::time::sleep(sleep).await;
-                        }
-                    },
-                    Err(e) =>
-                    {
-                        let mut do_retry = false;
-                        if e.is_connect() || ( req.method().is_safe() && 
e.is_timeout()) {
-                            do_retry = true
-                        } else {
-                            let mut source = e.source();
-                            while let Some(e) = source {
-                                if let Some(e) = 
e.downcast_ref::<hyper::Error>() {
-                                    do_retry = e.is_closed() || 
e.is_incomplete_message();
-                                    break
                                 }
-                                source = e.source();
-                            }
+                                false => Error::Reqwest {
+                                    retries,
+                                    max_retries,
+                                    elapsed: now.elapsed(),
+                                    retry_timeout,
+                                    source: e,
+                                }
+                            });
                         }
 
-                        if retries == max_retries
-                            || now.elapsed() > retry_timeout
-                            || !do_retry {
-
-                            return Err(Error::Reqwest {
-                                retries,
-                                max_retries,
-                                elapsed: now.elapsed(),
-                                retry_timeout,
-                                source: e,
-                            })
-                        }
                         let sleep = backoff.next();
                         retries += 1;
                         info!(
-                            "Encountered transport error backing off for {} 
seconds, retry {} of {}: {}", 
+                            "Encountered server error, backing off for {} 
seconds, retry {} of {}: {}",
                             sleep.as_secs_f32(),
                             retries,
                             max_retries,
@@ -295,10 +250,102 @@ impl RetryExt for reqwest::RequestBuilder {
                         );
                         tokio::time::sleep(sleep).await;
                     }
+                },
+                Err(e) =>
+                {
+                    let mut do_retry = false;
+                    if e.is_connect()
+                        || e.is_body()
+                        || (e.is_request() && !e.is_timeout())
+                        || (is_idempotent && e.is_timeout()) {
+                        do_retry = true
+                    } else {
+                        let mut source = e.source();
+                        while let Some(e) = source {
+                            if let Some(e) = e.downcast_ref::<hyper::Error>() {
+                                do_retry = e.is_closed()
+                                    || e.is_incomplete_message()
+                                    || e.is_body_write_aborted()
+                                    || (is_idempotent && e.is_timeout());
+                                break
+                            }
+                            if let Some(e) = 
e.downcast_ref::<std::io::Error>() {
+                                if e.kind() == std::io::ErrorKind::TimedOut {
+                                    do_retry = is_idempotent;
+                                } else {
+                                    do_retry = matches!(
+                                        e.kind(),
+                                        std::io::ErrorKind::ConnectionReset
+                                        | std::io::ErrorKind::ConnectionAborted
+                                        | std::io::ErrorKind::BrokenPipe
+                                        | std::io::ErrorKind::UnexpectedEof
+                                    );
+                                }
+                                break;
+                            }
+                            source = e.source();
+                        }
+                    }
+
+                    if retries == max_retries
+                        || now.elapsed() > retry_timeout
+                        || !do_retry {
+
+                        return Err(Error::Reqwest {
+                            retries,
+                            max_retries,
+                            elapsed: now.elapsed(),
+                            retry_timeout,
+                            source: e,
+                        })
+                    }
+                    let sleep = backoff.next();
+                    retries += 1;
+                    info!(
+                        "Encountered transport error backing off for {} 
seconds, retry {} of {}: {}",
+                        sleep.as_secs_f32(),
+                        retries,
+                        max_retries,
+                        e,
+                    );
+                    tokio::time::sleep(sleep).await;
                 }
             }
         }
-        .boxed()
+    }
+    .boxed()
+}
+
+pub trait RetryExt {
+    /// Dispatch a request with the given retry configuration
+    ///
+    /// # Panic
+    ///
+    /// This will panic if the request body is a stream
+    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>>;
+
+    /// Dispatch a request with the given retry configuration and idempotency
+    ///
+    /// # Panic
+    ///
+    /// This will panic if the request body is a stream
+    fn send_retry_with_idempotency(
+        self,
+        config: &RetryConfig,
+        is_idempotent: bool,
+    ) -> BoxFuture<'static, Result<Response>>;
+}
+
+impl RetryExt for reqwest::RequestBuilder {
+    fn send_retry(self, config: &RetryConfig) -> BoxFuture<'static, 
Result<Response>> {
+        send_retry_impl(self, config, None)
+    }
+    fn send_retry_with_idempotency(
+        self,
+        config: &RetryConfig,
+        is_idempotent: bool,
+    ) -> BoxFuture<'static, Result<Response>> {
+        send_retry_impl(self, config, Some(is_idempotent))
     }
 }
 
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index 3762915b0b0..17404f9d5ac 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -173,6 +173,7 @@ pub struct PutRequest<'a> {
     path: &'a Path,
     config: &'a GoogleCloudStorageConfig,
     builder: RequestBuilder,
+    idempotent: bool,
 }
 
 impl<'a> PutRequest<'a> {
@@ -186,12 +187,17 @@ impl<'a> PutRequest<'a> {
         Self { builder, ..self }
     }
 
+    fn set_idempotent(mut self, idempotent: bool) -> Self {
+        self.idempotent = idempotent;
+        self
+    }
+
     async fn send(self) -> Result<PutResult> {
         let credential = self.config.credentials.get_credential().await?;
         let response = self
             .builder
             .bearer_auth(&credential.bearer)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, 
self.idempotent)
             .await
             .context(PutRequestSnafu {
                 path: self.path.as_ref(),
@@ -281,7 +287,7 @@ impl GoogleCloudStorageClient {
             .post(&url)
             .bearer_auth(&credential.bearer)
             .json(&body)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .context(SignBlobRequestSnafu)?;
 
@@ -329,6 +335,7 @@ impl GoogleCloudStorageClient {
             path,
             builder,
             config: &self.config,
+            idempotent: false,
         }
     }
 
@@ -336,7 +343,7 @@ impl GoogleCloudStorageClient {
         let builder = self.put_request(path, data);
 
         let builder = match &opts.mode {
-            PutMode::Overwrite => builder,
+            PutMode::Overwrite => builder.set_idempotent(true),
             PutMode::Create => builder.header(&VERSION_MATCH, "0"),
             PutMode::Update(v) => {
                 let etag = v.version.as_ref().context(MissingVersionSnafu)?;
@@ -366,7 +373,12 @@ impl GoogleCloudStorageClient {
             ("partNumber", &format!("{}", part_idx + 1)),
             ("uploadId", upload_id),
         ];
-        let result = self.put_request(path, data).query(query).send().await?;
+        let result = self
+            .put_request(path, data)
+            .query(query)
+            .set_idempotent(true)
+            .send()
+            .await?;
 
         Ok(PartId {
             content_id: result.e_tag.unwrap(),
@@ -391,7 +403,7 @@ impl GoogleCloudStorageClient {
             .header(header::CONTENT_TYPE, content_type)
             .header(header::CONTENT_LENGTH, "0")
             .query(&[("uploads", "")])
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .context(PutRequestSnafu {
                 path: path.as_ref(),
@@ -432,7 +444,11 @@ impl GoogleCloudStorageClient {
     ) -> Result<PutResult> {
         if completed_parts.is_empty() {
             // GCS doesn't allow empty multipart uploads
-            let result = self.put_request(path, 
Default::default()).send().await?;
+            let result = self
+                .put_request(path, Default::default())
+                .set_idempotent(true)
+                .send()
+                .await?;
             self.multipart_cleanup(path, multipart_id).await?;
             return Ok(result);
         }
@@ -456,7 +472,7 @@ impl GoogleCloudStorageClient {
             .bearer_auth(&credential.bearer)
             .query(&[("uploadId", upload_id)])
             .body(data)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, true)
             .await
             .context(CompleteMultipartRequestSnafu)?;
 
@@ -515,7 +531,7 @@ impl GoogleCloudStorageClient {
             // Needed if reqwest is compiled with native-tls instead of 
rustls-tls
             // See https://github.com/apache/arrow-rs/pull/3921
             .header(header::CONTENT_LENGTH, 0)
-            .send_retry(&self.config.retry_config)
+            .send_retry_with_idempotency(&self.config.retry_config, 
!if_not_exists)
             .await
             .map_err(|err| match err.status() {
                 Some(StatusCode::PRECONDITION_FAILED) => 
crate::Error::AlreadyExists {
diff --git a/object_store/src/gcp/credential.rs 
b/object_store/src/gcp/credential.rs
index fcd516a1bf1..158716ce4c1 100644
--- a/object_store/src/gcp/credential.rs
+++ b/object_store/src/gcp/credential.rs
@@ -623,7 +623,7 @@ impl TokenProvider for AuthorizedUserCredentials {
                 ("client_secret", &self.client_secret),
                 ("refresh_token", &self.refresh_token),
             ])
-            .send_retry(retry)
+            .send_retry_with_idempotency(retry, true)
             .await
             .context(TokenRequestSnafu)?
             .json::<TokenResponse>()
@@ -709,12 +709,12 @@ impl GCSAuthorizer {
     /// Canonicalizes query parameters into the GCP canonical form
     /// form like:
     ///```plaintext
-    ///HTTP_VERB  
-    ///PATH_TO_RESOURCE  
-    ///CANONICAL_QUERY_STRING  
-    ///CANONICAL_HEADERS  
+    ///HTTP_VERB
+    ///PATH_TO_RESOURCE
+    ///CANONICAL_QUERY_STRING
+    ///CANONICAL_HEADERS
     ///
-    ///SIGNED_HEADERS  
+    ///SIGNED_HEADERS
     ///PAYLOAD
     ///```
     ///
@@ -780,9 +780,9 @@ impl GCSAuthorizer {
     ///construct the string to sign
     ///form like:
     ///```plaintext
-    ///SIGNING_ALGORITHM  
-    ///ACTIVE_DATETIME  
-    ///CREDENTIAL_SCOPE  
+    ///SIGNING_ALGORITHM
+    ///ACTIVE_DATETIME
+    ///CREDENTIAL_SCOPE
     ///HASHED_CANONICAL_REQUEST
     ///```
     ///`ACTIVE_DATETIME` format:`YYYYMMDD'T'HHMMSS'Z'`
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 8700775fb24..fdc8751c1ca 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -189,7 +189,7 @@ impl Client {
             .client
             .request(method, url)
             .header("Depth", depth)
-            .send_retry(&self.retry_config)
+            .send_retry_with_idempotency(&self.retry_config, true)
             .await;
 
         let response = match result {

Reply via email to