This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new ccb81e371 feat(services/s3): Allow retry for unexpected 499 error
(#2453)
ccb81e371 is described below
commit ccb81e371881f74c4b8d11d36072e60bb7bb6f5f
Author: Xuanwo <[email protected]>
AuthorDate: Mon Jun 12 21:13:10 2023 +0800
feat(services/s3): Allow retry for unexpected 499 error (#2453)
* feat: Allow retry for unexpected 499 error
Signed-off-by: Xuanwo <[email protected]>
* Fix wasabi batch retry
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/http_util/client.rs | 15 ++++++++++-----
core/src/services/s3/backend.rs | 3 +--
core/src/services/s3/error.rs | 24 ++++++++++++-----------
core/src/services/wasabi/backend.rs | 14 ++++++++++----
core/src/services/wasabi/error.rs | 38 ++++++++++++++++++++++++++++---------
5 files changed, 63 insertions(+), 31 deletions(-)
diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs
index 22a84c7c8..75137122a 100644
--- a/core/src/raw/http_util/client.rs
+++ b/core/src/raw/http_util/client.rs
@@ -77,7 +77,9 @@ impl HttpClient {
/// Send a request in async way.
pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
- let url = req.uri().to_string();
+ // Uri stores all string alike data in `Bytes` which means
+ // the clone here is cheap.
+ let uri = req.uri().clone();
let is_head = req.method() == http::Method::HEAD;
let (parts, body) = req.into_parts();
@@ -86,7 +88,7 @@ impl HttpClient {
.client
.request(
parts.method,
- reqwest::Url::from_str(&url).expect("input request url must be
valid"),
+ reqwest::Url::from_str(&uri.to_string()).expect("input request
url must be valid"),
)
.version(parts.version)
.headers(parts.headers);
@@ -113,7 +115,7 @@ impl HttpClient {
let mut oerr = Error::new(ErrorKind::Unexpected, "send async
request")
.with_operation("http_util::Client::send_async")
- .with_context("url", &url)
+ .with_context("url", uri.to_string())
.set_source(err);
if is_temporary {
oerr = oerr.set_temporary();
@@ -132,7 +134,10 @@ impl HttpClient {
let mut hr = Response::builder()
.version(resp.version())
- .status(resp.status());
+ .status(resp.status())
+ // Insert uri into response extension so that we can fetch
+ // it later.
+ .extension(uri.clone());
// Swap headers directly instead of copy the entire map.
mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());
@@ -141,7 +146,7 @@ impl HttpClient {
// it to interrupt so we can retry it.
Error::new(ErrorKind::Unexpected, "read data from http stream")
.map(|v| if err.is_body() { v.set_temporary() } else { v })
- .with_context("url", &url)
+ .with_context("url", uri.to_string())
.set_source(err)
});
diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs
index 26da170eb..90f0acc98 100644
--- a/core/src/services/s3/backend.rs
+++ b/core/src/services/s3/backend.rs
@@ -969,14 +969,13 @@ impl Accessor for S3Backend {
let path = build_rel_path(&self.core.root, &i.key);
batched_result.push((path, Ok(RpDelete::default().into())));
}
- // TODO: we should handle those errors with code.
for i in result.error {
let path = build_rel_path(&self.core.root, &i.key);
// set the error kind and mark temporary if retryable
let (kind, retryable) =
parse_s3_error_code(i.code.as_str()).unwrap_or((ErrorKind::Unexpected, false));
- let mut err = Error::new(kind, &format!("{i:?}"));
+ let mut err: Error = Error::new(kind, &format!("{i:?}"));
if retryable {
err = err.set_temporary();
}
diff --git a/core/src/services/s3/error.rs b/core/src/services/s3/error.rs
index eb943cb30..b695711e5 100644
--- a/core/src/services/s3/error.rs
+++ b/core/src/services/s3/error.rs
@@ -17,7 +17,7 @@
use bytes::Buf;
use http::Response;
-use http::StatusCode;
+use http::Uri;
use quick_xml::de;
use serde::Deserialize;
@@ -41,16 +41,14 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
let (parts, body) = resp.into_parts();
let bs = body.bytes().await?;
- let (mut kind, mut retryable) = match parts.status {
- StatusCode::NOT_FOUND => (ErrorKind::NotFound, false),
- StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false),
- StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => {
- (ErrorKind::ConditionNotMatch, false)
- }
- StatusCode::INTERNAL_SERVER_ERROR
- | StatusCode::BAD_GATEWAY
- | StatusCode::SERVICE_UNAVAILABLE
- | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true),
+ let (mut kind, mut retryable) = match parts.status.as_u16() {
+ 403 => (ErrorKind::PermissionDenied, false),
+ 404 => (ErrorKind::NotFound, false),
+ 304 | 412 => (ErrorKind::ConditionNotMatch, false),
+ // Service like R2 could return 499 error with a message like:
+ // Client Disconnect, we should retry it.
+ 499 => (ErrorKind::Unexpected, true),
+ 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
_ => (ErrorKind::Unexpected, false),
};
@@ -68,6 +66,10 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
err = err.set_temporary();
}
+ if let Some(uri) = parts.extensions.get::<Uri>() {
+ err = err.with_context("uri", uri.to_string());
+ }
+
Ok(err)
}
diff --git a/core/src/services/wasabi/backend.rs
b/core/src/services/wasabi/backend.rs
index 97fc9cfd4..c54eca161 100644
--- a/core/src/services/wasabi/backend.rs
+++ b/core/src/services/wasabi/backend.rs
@@ -37,6 +37,7 @@ use reqsign::AwsV4Signer;
use super::core::*;
use super::error::parse_error;
+use super::error::parse_wasabi_error_code;
use super::pager::WasabiPager;
use super::writer::WasabiWriter;
use crate::raw::*;
@@ -1101,10 +1102,15 @@ impl Accessor for WasabiBackend {
for i in result.error {
let path = build_rel_path(&self.core.root, &i.key);
- batched_result.push((
- path,
- Err(Error::new(ErrorKind::Unexpected, &format!("{i:?}"))),
- ));
+ // set the error kind and mark temporary if retryable
+ let (kind, retryable) =
+
parse_wasabi_error_code(&i.code).unwrap_or((ErrorKind::Unexpected, false));
+ let mut err: Error = Error::new(kind, &format!("{i:?}"));
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ batched_result.push((path, Err(err)));
}
Ok(RpBatch::new(batched_result))
diff --git a/core/src/services/wasabi/error.rs
b/core/src/services/wasabi/error.rs
index 3791ee95a..154d49aa8 100644
--- a/core/src/services/wasabi/error.rs
+++ b/core/src/services/wasabi/error.rs
@@ -58,16 +58,8 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
.map(|err| (format!("{err:?}"), Some(err)))
.unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
- // All possible error code:
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
if let Some(wasabi_err) = wasabi_err {
- (kind, retryable) = match wasabi_err.code.as_str() {
- // > Your socket connection to the server was not read from
- // > or written to within the timeout period."
- //
- // It's Ok for us to retry it again.
- "RequestTimeout" => (ErrorKind::Unexpected, true),
- _ => (kind, retryable),
- }
+ (kind, retryable) =
parse_wasabi_error_code(&wasabi_err.code).unwrap_or((kind, retryable));
}
let mut err = Error::new(kind, &message).with_context("response",
format!("{parts:?}"));
@@ -79,6 +71,34 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>)
-> Result<Error> {
Ok(err)
}
+/// Returns the Errorkind of this code and whether the error is retryable.
+/// All possible error code:
<https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList>
+pub fn parse_wasabi_error_code(code: &str) -> Option<(ErrorKind, bool)> {
+ match code {
+ // > Your socket connection to the server was not read from
+ // > or written to within the timeout period."
+ //
+ // It's Ok for us to retry it again.
+ "RequestTimeout" => Some((ErrorKind::Unexpected, true)),
+ // > An internal error occurred. Try again.
+ "InternalError" => Some((ErrorKind::Unexpected, true)),
+ // > A conflicting conditional operation is currently in progress
+ // > against this resource. Try again.
+ "OperationAborted" => Some((ErrorKind::Unexpected, true)),
+ // > Please reduce your request rate.
+ //
+ // It's Ok to retry since later on the request rate may get reduced.
+ "SlowDown" => Some((ErrorKind::RateLimited, true)),
+ // > Service is unable to handle request.
+ //
+ // ServiceUnavailable is considered a retryable error because it
typically
+ // indicates a temporary issue with the service or server, such as
high load,
+ // maintenance, or an internal problem.
+ "ServiceUnavailable" => Some((ErrorKind::Unexpected, true)),
+ _ => None,
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;