This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ccb81e371 feat(services/s3): Allow retry for unexpected 499 error 
(#2453)
ccb81e371 is described below

commit ccb81e371881f74c4b8d11d36072e60bb7bb6f5f
Author: Xuanwo <[email protected]>
AuthorDate: Mon Jun 12 21:13:10 2023 +0800

    feat(services/s3): Allow retry for unexpected 499 error (#2453)
    
    * feat: Allow retry for unexpected 499 error
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix wasabi batch retry
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/http_util/client.rs    | 15 ++++++++++-----
 core/src/services/s3/backend.rs     |  3 +--
 core/src/services/s3/error.rs       | 24 ++++++++++++-----------
 core/src/services/wasabi/backend.rs | 14 ++++++++++----
 core/src/services/wasabi/error.rs   | 38 ++++++++++++++++++++++++++++---------
 5 files changed, 63 insertions(+), 31 deletions(-)

diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 22a84c7c8..75137122a 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -77,7 +77,9 @@ impl HttpClient {
 
     /// Send a request in async way.
     pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
-        let url = req.uri().to_string();
+        // Uri stores all string alike data in `Bytes` which means
+        // the clone here is cheap.
+        let uri = req.uri().clone();
         let is_head = req.method() == http::Method::HEAD;
 
         let (parts, body) = req.into_parts();
@@ -86,7 +88,7 @@ impl HttpClient {
             .client
             .request(
                 parts.method,
-                reqwest::Url::from_str(&url).expect("input request url must be 
valid"),
+                reqwest::Url::from_str(&uri.to_string()).expect("input request 
url must be valid"),
             )
             .version(parts.version)
             .headers(parts.headers);
@@ -113,7 +115,7 @@ impl HttpClient {
 
             let mut oerr = Error::new(ErrorKind::Unexpected, "send async 
request")
                 .with_operation("http_util::Client::send_async")
-                .with_context("url", &url)
+                .with_context("url", uri.to_string())
                 .set_source(err);
             if is_temporary {
                 oerr = oerr.set_temporary();
@@ -132,7 +134,10 @@ impl HttpClient {
 
         let mut hr = Response::builder()
             .version(resp.version())
-            .status(resp.status());
+            .status(resp.status())
+            // Insert uri into response extension so that we can fetch
+            // it later.
+            .extension(uri.clone());
         // Swap headers directly instead of copy the entire map.
         mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
 
@@ -141,7 +146,7 @@ impl HttpClient {
             // it to interrupt so we can retry it.
             Error::new(ErrorKind::Unexpected, "read data from http stream")
                 .map(|v| if err.is_body() { v.set_temporary() } else { v })
-                .with_context("url", &url)
+                .with_context("url", uri.to_string())
                 .set_source(err)
         });
 
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 26da170eb..90f0acc98 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -969,14 +969,13 @@ impl Accessor for S3Backend {
                 let path = build_rel_path(&self.core.root, &i.key);
                 batched_result.push((path, Ok(RpDelete::default().into())));
             }
-            // TODO: we should handle those errors with code.
             for i in result.error {
                 let path = build_rel_path(&self.core.root, &i.key);
 
                 // set the error kind and mark temporary if retryable
                 let (kind, retryable) =
                     
parse_s3_error_code(i.code.as_str()).unwrap_or((ErrorKind::Unexpected, false));
-                let mut err = Error::new(kind, &format!("{i:?}"));
+                let mut err: Error = Error::new(kind, &format!("{i:?}"));
                 if retryable {
                     err = err.set_temporary();
                 }
diff --git a/core/src/services/s3/error.rs b/core/src/services/s3/error.rs
index eb943cb30..b695711e5 100644
--- a/core/src/services/s3/error.rs
+++ b/core/src/services/s3/error.rs
@@ -17,7 +17,7 @@
 
 use bytes::Buf;
 use http::Response;
-use http::StatusCode;
+use http::Uri;
 use quick_xml::de;
 use serde::Deserialize;
 
@@ -41,16 +41,14 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
     let (parts, body) = resp.into_parts();
     let bs = body.bytes().await?;
 
-    let (mut kind, mut retryable) = match parts.status {
-        StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
-        StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
-        StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
-            (ErrorKind::ConditionNotMatch, false)
-        }
-        StatusCode::INTERNAL_SERVER_ERROR
-        | StatusCode::BAD_GATEWAY
-        | StatusCode::SERVICE_UNAVAILABLE
-        | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+    let (mut kind, mut retryable) = match parts.status.as_u16() {
+        403 => (ErrorKind::PermissionDenied, false),
+        404 => (ErrorKind::NotFound, false),
+        304 | 412 => (ErrorKind::ConditionNotMatch, false),
+        // Service like R2 could return 499 error with a message like:
+        // Client Disconnect, we should retry it.
+        499 => (ErrorKind::Unexpected, true),
+        500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
         _ => (ErrorKind::Unexpected, false),
     };
 
@@ -68,6 +66,10 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
         err = err.set_temporary();
     }
 
+    if let Some(uri) = parts.extensions.get::<Uri>() {
+        err = err.with_context("uri", uri.to_string());
+    }
+
     Ok(err)
 }
 
diff --git a/core/src/services/wasabi/backend.rs 
b/core/src/services/wasabi/backend.rs
index 97fc9cfd4..c54eca161 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -37,6 +37,7 @@ use reqsign::AwsV4Signer;
 
 use super::core::*;
 use super::error::parse_error;
+use super::error::parse_wasabi_error_code;
 use super::pager::WasabiPager;
 use super::writer::WasabiWriter;
 use crate::raw::*;
@@ -1101,10 +1102,15 @@ impl Accessor for WasabiBackend {
             for i in result.error {
                 let path = build_rel_path(&self.core.root, &i.key);
 
-                batched_result.push((
-                    path,
-                    Err(Error::new(ErrorKind::Unexpected, &format!("{i:?}"))),
-                ));
+                // set the error kind and mark temporary if retryable
+                let (kind, retryable) =
+                    
parse_wasabi_error_code(&i.code).unwrap_or((ErrorKind::Unexpected, false));
+                let mut err: Error = Error::new(kind, &format!("{i:?}"));
+                if retryable {
+                    err = err.set_temporary();
+                }
+
+                batched_result.push((path, Err(err)));
             }
 
             Ok(RpBatch::new(batched_result))
diff --git a/core/src/services/wasabi/error.rs 
b/core/src/services/wasabi/error.rs
index 3791ee95a..154d49aa8 100644
--- a/core/src/services/wasabi/error.rs
+++ b/core/src/services/wasabi/error.rs
@@ -58,16 +58,8 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
         .map(|err| (format!("{err:?}"), Some(err)))
         .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
 
-    // All possible error code: 
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
     if let Some(wasabi_err) = wasabi_err {
-        (kind, retryable) = match wasabi_err.code.as_str() {
-            // > Your socket connection to the server was not read from
-            // > or written to within the timeout period."
-            //
-            // It's Ok for us to retry it again.
-            "RequestTimeout" => (ErrorKind::Unexpected, true),
-            _ => (kind, retryable),
-        }
+        (kind, retryable) = 
parse_wasabi_error_code(&wasabi_err.code).unwrap_or((kind, retryable));
     }
 
     let mut err = Error::new(kind, &message).with_context("response", 
format!("{parts:?}"));
@@ -79,6 +71,34 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) 
-> Result<Error> {
     Ok(err)
 }
 
+/// Returns the Errorkind of this code and whether the error is retryable.
+/// All possible error code: 
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
+pub fn parse_wasabi_error_code(code: &str) -> Option<(ErrorKind, bool)> {
+    match code {
+        // > Your socket connection to the server was not read from
+        // > or written to within the timeout period."
+        //
+        // It's Ok for us to retry it again.
+        "RequestTimeout" => Some((ErrorKind::Unexpected, true)),
+        // > An internal error occurred. Try again.
+        "InternalError" => Some((ErrorKind::Unexpected, true)),
+        // > A conflicting conditional operation is currently in progress
+        // > against this resource. Try again.
+        "OperationAborted" => Some((ErrorKind::Unexpected, true)),
+        // > Please reduce your request rate.
+        //
+        // It's Ok to retry since later on the request rate may get reduced.
+        "SlowDown" => Some((ErrorKind::RateLimited, true)),
+        // > Service is unable to handle request.
+        //
+        // ServiceUnavailable is considered a retryable error because it 
typically
+        // indicates a temporary issue with the service or server, such as 
high load,
+        // maintenance, or an internal problem.
+        "ServiceUnavailable" => Some((ErrorKind::Unexpected, true)),
+        _ => None,
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;

Reply via email to