This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch allow-retry-499 in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 9a680829d4490cc44e444739eac97a1dac4eb6fd Author: Xuanwo <[email protected]> AuthorDate: Mon Jun 12 20:27:24 2023 +0800 feat: Allow retry for unexpected 499 error Signed-off-by: Xuanwo <[email protected]> --- core/src/raw/http_util/client.rs | 15 ++++++++++----- core/src/services/s3/error.rs | 24 +++++++++++++----------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 22a84c7c8..75137122a 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -77,7 +77,9 @@ impl HttpClient { /// Send a request in async way. pub async fn send(&self, req: Request<AsyncBody>) -> Result<Response<IncomingAsyncBody>> { - let url = req.uri().to_string(); + // Uri stores all string alike data in `Bytes` which means + // the clone here is cheap. + let uri = req.uri().clone(); let is_head = req.method() == http::Method::HEAD; let (parts, body) = req.into_parts(); @@ -86,7 +88,7 @@ impl HttpClient { .client .request( parts.method, - reqwest::Url::from_str(&url).expect("input request url must be valid"), + reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), ) .version(parts.version) .headers(parts.headers); @@ -113,7 +115,7 @@ impl HttpClient { let mut oerr = Error::new(ErrorKind::Unexpected, "send async request") .with_operation("http_util::Client::send_async") - .with_context("url", &url) + .with_context("url", uri.to_string()) .set_source(err); if is_temporary { oerr = oerr.set_temporary(); @@ -132,7 +134,10 @@ impl HttpClient { let mut hr = Response::builder() .version(resp.version()) - .status(resp.status()); + .status(resp.status()) + // Insert uri into response extension so that we can fetch + // it later. + .extension(uri.clone()); // Swap headers directly instead of copy the entire map. mem::swap(hr.headers_mut().unwrap(), resp.headers_mut()); @@ -141,7 +146,7 @@ impl HttpClient { // it to interrupt so we can retry it. Error::new(ErrorKind::Unexpected, "read data from http stream") .map(|v| if err.is_body() { v.set_temporary() } else { v }) - .with_context("url", &url) + .with_context("url", uri.to_string()) .set_source(err) }); diff --git a/core/src/services/s3/error.rs b/core/src/services/s3/error.rs index eb943cb30..b695711e5 100644 --- a/core/src/services/s3/error.rs +++ b/core/src/services/s3/error.rs @@ -17,7 +17,7 @@ use bytes::Buf; use http::Response; -use http::StatusCode; +use http::Uri; use quick_xml::de; use serde::Deserialize; @@ -41,16 +41,14 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> { let (parts, body) = resp.into_parts(); let bs = body.bytes().await?; - let (mut kind, mut retryable) = match parts.status { - StatusCode::NOT_FOUND => (ErrorKind::NotFound, false), - StatusCode::FORBIDDEN => (ErrorKind::PermissionDenied, false), - StatusCode::PRECONDITION_FAILED | StatusCode::NOT_MODIFIED => { - (ErrorKind::ConditionNotMatch, false) - } - StatusCode::INTERNAL_SERVER_ERROR - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT => (ErrorKind::Unexpected, true), + let (mut kind, mut retryable) = match parts.status.as_u16() { + 403 => (ErrorKind::PermissionDenied, false), + 404 => (ErrorKind::NotFound, false), + 304 | 412 => (ErrorKind::ConditionNotMatch, false), + // Service like R2 could return 499 error with a message like: + // Client Disconnect, we should retry it. + 499 => (ErrorKind::Unexpected, true), + 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true), _ => (ErrorKind::Unexpected, false), }; @@ -68,6 +66,10 @@ pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> { err = err.set_temporary(); } + if let Some(uri) = parts.extensions.get::<Uri>() { + err = err.with_context("uri", uri.to_string()); + } + Ok(err) }
