This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new fb9f447d feat: add exponential backoff when retry (#1486)
fb9f447d is described below
commit fb9f447dee6d6026ce3472146e56b71cfafe247b
Author: MianChen <[email protected]>
AuthorDate: Tue Feb 27 15:27:38 2024 +0800
feat: add exponential backoff when retry (#1486)
## Rationale
Add exponential backoff retry.
## Detailed Changes
follow
[this](https://github.com/apache/arrow-rs/blob/dfb642809e93c2c1b8343692f4e4b3080000f988/object_store/src/client/backoff.rs#L26)
backoff implementation.
## Test Plan
New UT.
---------
Co-authored-by: zealchen <[email protected]>
---
Cargo.lock | 15 ++--
Cargo.toml | 2 +-
src/analytic_engine/src/sst/file.rs | 8 +-
src/components/future_ext/Cargo.toml | 1 +
src/components/future_ext/src/lib.rs | 2 +-
src/components/future_ext/src/retry.rs | 123 ++++++++++++++++++++++++--
src/components/skiplist/benches/bench.rs | 8 +-
src/components/table_kv/src/obkv/tests.rs | 2 +-
src/components/table_kv/src/tests.rs | 2 +-
src/meta_client/src/load_balance.rs | 2 +-
src/server/src/grpc/meta_event_service/mod.rs | 8 +-
11 files changed, 147 insertions(+), 26 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index f43a9036..36c989c6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -121,7 +121,7 @@ dependencies = [
"pin-project-lite",
"prometheus 0.12.0",
"prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
"remote_engine_client",
"router",
"runtime",
@@ -899,7 +899,7 @@ dependencies = [
"parquet",
"parquet_ext",
"pprof",
- "rand 0.7.3",
+ "rand 0.8.5",
"runtime",
"serde",
"size_ext",
@@ -1532,7 +1532,7 @@ dependencies = [
"macros",
"paste 1.0.12",
"prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
"seahash",
"serde",
"serde_json",
@@ -2692,6 +2692,7 @@ dependencies = [
"futures 0.3.28",
"lazy_static",
"prometheus 0.12.0",
+ "rand 0.8.5",
"runtime",
"tokio",
]
@@ -4534,7 +4535,7 @@ dependencies = [
"prometheus 0.12.0",
"prometheus-static-metric",
"prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
"runtime",
"serde",
"serde_json",
@@ -6599,7 +6600,7 @@ dependencies = [
"arena",
"bytes",
"criterion",
- "rand 0.7.3",
+ "rand 0.8.5",
"yatp",
]
@@ -7009,7 +7010,7 @@ dependencies = [
"logger",
"macros",
"prost 0.11.8",
- "rand 0.7.3",
+ "rand 0.8.5",
"regex",
"runtime",
"serde",
@@ -7029,7 +7030,7 @@ dependencies = [
"macros",
"obkv-table-client-rs",
"prometheus 0.12.0",
- "rand 0.7.3",
+ "rand 0.8.5",
"serde",
"snafu 0.6.10",
"time_ext",
diff --git a/Cargo.toml b/Cargo.toml
index b41694b3..5bb84523 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -154,7 +154,7 @@ prost = "0.11"
proxy = { path = "src/proxy" }
query_engine = { path = "src/query_engine" }
query_frontend = { path = "src/query_frontend" }
-rand = "0.7"
+rand = "0.8.5"
regex = "1"
remote_engine_client = { path = "src/remote_engine_client" }
reqwest = { version = "0.11", default-features = false, features = [
diff --git a/src/analytic_engine/src/sst/file.rs
b/src/analytic_engine/src/sst/file.rs
index f2aa9ec6..39cdc7c7 100644
--- a/src/analytic_engine/src/sst/file.rs
+++ b/src/analytic_engine/src/sst/file.rs
@@ -34,7 +34,7 @@ use common_types::{
time::{TimeRange, Timestamp},
SequenceNumber,
};
-use future_ext::{retry_async, RetryConfig};
+use future_ext::{retry_async, BackoffConfig, RetryConfig};
use logger::{error, info, trace, warn};
use macros::define_result;
use metric_ext::Meter;
@@ -540,7 +540,11 @@ pub struct FilePurger {
impl FilePurger {
const RETRY_CONFIG: RetryConfig = RetryConfig {
max_retries: 3,
- interval: Duration::from_millis(500),
+ backoff: BackoffConfig {
+ init_backoff: Duration::from_millis(500),
+ max_backoff: Duration::from_secs(5),
+ base: 3.,
+ },
};
pub fn start(runtime: &Runtime, store: ObjectStoreRef) -> Self {
diff --git a/src/components/future_ext/Cargo.toml
b/src/components/future_ext/Cargo.toml
index f9be39c1..1bc72d52 100644
--- a/src/components/future_ext/Cargo.toml
+++ b/src/components/future_ext/Cargo.toml
@@ -34,5 +34,6 @@ workspace = true
futures = { workspace = true }
lazy_static = { workspace = true }
prometheus = { workspace = true }
+rand = { workspace = true }
runtime = { workspace = true }
tokio = { workspace = true, features = ["time"] }
diff --git a/src/components/future_ext/src/lib.rs
b/src/components/future_ext/src/lib.rs
index 083908ed..ab8d71a2 100644
--- a/src/components/future_ext/src/lib.rs
+++ b/src/components/future_ext/src/lib.rs
@@ -21,4 +21,4 @@ mod cancel;
mod retry;
pub use cancel::CancellationSafeFuture;
-pub use retry::{retry_async, RetryConfig};
+pub use retry::{retry_async, BackoffConfig, RetryConfig};
diff --git a/src/components/future_ext/src/retry.rs
b/src/components/future_ext/src/retry.rs
index 43df7e50..ea646b63 100644
--- a/src/components/future_ext/src/retry.rs
+++ b/src/components/future_ext/src/retry.rs
@@ -20,35 +20,101 @@
use std::time::Duration;
use futures::Future;
+use rand::prelude::*;
-// TODO: add backoff
-//
https://github.com/apache/arrow-rs/blob/dfb642809e93c2c1b8343692f4e4b3080000f988/object_store/src/client/backoff.rs#L26
pub struct RetryConfig {
pub max_retries: usize,
- pub interval: Duration,
+ pub backoff: BackoffConfig,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
- interval: Duration::from_millis(500),
+ backoff: Default::default(),
+ }
+ }
+}
+
+// This backoff implementation is ported from
+//
https://github.com/apache/arrow-rs/blob/dfb642809e93c2c1b8343692f4e4b3080000f988/object_store/src/client/backoff.rs#L26
+pub struct BackoffConfig {
+ /// The initial backoff duration
+ pub init_backoff: Duration,
+ /// The maximum backoff duration
+ pub max_backoff: Duration,
+ /// The base of the exponential to use
+ pub base: f64,
+}
+
+impl Default for BackoffConfig {
+ fn default() -> Self {
+ Self {
+ init_backoff: Duration::from_millis(100),
+ max_backoff: Duration::from_secs(15),
+ base: 2.,
}
}
}
+pub struct Backoff {
+ init_backoff: f64,
+ next_backoff_secs: f64,
+ max_backoff_secs: f64,
+ base: f64,
+ rng: Option<Box<dyn RngCore + Sync + Send>>,
+}
+
+impl Backoff {
+ /// Create a new [`Backoff`] from the provided [`BackoffConfig`]
+ pub fn new(config: &BackoffConfig) -> Self {
+ Self::new_with_rng(config, None)
+ }
+
+ /// Creates a new `Backoff` with the optional `rng`
+ ///
+ /// Used [`rand::thread_rng()`] if no rng provided
+ pub fn new_with_rng(
+ config: &BackoffConfig,
+ rng: Option<Box<dyn RngCore + Sync + Send>>,
+ ) -> Self {
+ let init_backoff = config.init_backoff.as_secs_f64();
+ Self {
+ init_backoff,
+ next_backoff_secs: init_backoff,
+ max_backoff_secs: config.max_backoff.as_secs_f64(),
+ base: config.base,
+ rng,
+ }
+ }
+
+ /// Returns the next backoff duration to wait for
+ pub fn next(&mut self) -> Duration {
+ let range = self.init_backoff..(self.next_backoff_secs * self.base);
+
+ let rand_backoff = match self.rng.as_mut() {
+ Some(rng) => rng.gen_range(range),
+ None => thread_rng().gen_range(range),
+ };
+
+ let next_backoff = self.max_backoff_secs.min(rand_backoff);
+ Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs,
next_backoff))
+ }
+}
+
pub async fn retry_async<F, Fut, T, E>(f: F, config: &RetryConfig) ->
Fut::Output
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
+ let mut backoff = Backoff::new(&config.backoff);
for _ in 0..config.max_retries {
let result = f().await;
if result.is_ok() {
return result;
}
- tokio::time::sleep(config.interval).await;
+ tokio::time::sleep(backoff.next()).await;
}
f().await
@@ -58,13 +124,15 @@ where
mod tests {
use std::sync::atomic::{AtomicU8, Ordering};
+ use rand::rngs::mock::StepRng;
+
use super::*;
#[tokio::test]
async fn test_retry_async() {
let config = RetryConfig {
max_retries: 3,
- interval: Duration::from_millis(5),
+ backoff: Default::default(),
};
// always fails
@@ -109,4 +177,47 @@ mod tests {
assert_eq!(3, runs.load(Ordering::Relaxed));
}
}
+
+ #[test]
+ fn test_backoff() {
+ let init_backoff_secs = 1.0;
+ let max_backoff_secs = 500.0;
+ let base = 3.0;
+
+ let config = BackoffConfig {
+ init_backoff: Duration::from_secs_f64(init_backoff_secs),
+ max_backoff: Duration::from_secs_f64(max_backoff_secs),
+ base,
+ };
+
+ let assert_fuzzy_eq = |a: f64, b: f64| assert!((b - a).abs() < 0.0001,
"{a} != {b}");
+
+ // Create a static rng that takes the minimum of the range
+ let rng = Box::new(StepRng::new(0, 0));
+ let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+ for _ in 0..20 {
+ assert_eq!(backoff.next().as_secs_f64(), init_backoff_secs);
+ }
+
+ // Create a static rng that takes the maximum of the range
+ let rng = Box::new(StepRng::new(u64::MAX, 0));
+ let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+ for i in 0..20 {
+ let value = (base.powi(i) *
init_backoff_secs).min(max_backoff_secs);
+ assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
+ }
+
+ // Create a static rng that takes the mid point of the range
+ let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
+ let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+ let mut value = init_backoff_secs;
+ for _ in 0..20 {
+ assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
+ value =
+ (init_backoff_secs + (value * base - init_backoff_secs) /
2.).min(max_backoff_secs);
+ }
+ }
}
diff --git a/src/components/skiplist/benches/bench.rs
b/src/components/skiplist/benches/bench.rs
index 66c51156..b048f737 100644
--- a/src/components/skiplist/benches/bench.rs
+++ b/src/components/skiplist/benches/bench.rs
@@ -76,13 +76,13 @@ fn bench_read_write_skiplist_frac(b: &mut Bencher<'_>,
frac: &usize) {
let mut rng = rand::thread_rng();
while !s.load(Ordering::SeqCst) {
let key = random_key(&mut rng);
- let case = (key, frac > rng.gen_range(0, 11));
+ let case = (key, frac > rng.gen_range(0..11));
skiplist_round(&l, &case, &v);
}
});
let mut rng = rand::thread_rng();
b.iter_batched_ref(
- || (random_key(&mut rng), frac > rng.gen_range(0, 11)),
+ || (random_key(&mut rng), frac > rng.gen_range(0..11)),
|case| skiplist_round(&list, case, &value),
BatchSize::SmallInput,
);
@@ -127,7 +127,7 @@ fn bench_read_write_map_frac(b: &mut Bencher<'_>, frac:
&usize) {
let handle = thread::spawn(move || {
let mut rng = rand::thread_rng();
while !thread_stop.load(Ordering::SeqCst) {
- let f = rng.gen_range(0, 11);
+ let f = rng.gen_range(0..11);
let case = (random_key(&mut rng), f < frac);
map_round(&map_in_thread, &case, &v);
}
@@ -135,7 +135,7 @@ fn bench_read_write_map_frac(b: &mut Bencher<'_>, frac:
&usize) {
let mut rng = rand::thread_rng();
b.iter_batched_ref(
|| {
- let f = rng.gen_range(0, 11);
+ let f = rng.gen_range(0..11);
(random_key(&mut rng), f < frac)
},
|case| map_round(&map, case, &value),
diff --git a/src/components/table_kv/src/obkv/tests.rs
b/src/components/table_kv/src/obkv/tests.rs
index 8f21d0f8..96b88bf9 100644
--- a/src/components/table_kv/src/obkv/tests.rs
+++ b/src/components/table_kv/src/obkv/tests.rs
@@ -147,7 +147,7 @@ impl Drop for ObkvTester {
fn random_table_name(prefix: &str) -> String {
let mut rng = thread_rng();
- let v: u32 = rng.gen_range(0, MAX_TABLE_ID);
+ let v: u32 = rng.gen_range(0..MAX_TABLE_ID);
format!("{prefix}_{v}")
}
diff --git a/src/components/table_kv/src/tests.rs
b/src/components/table_kv/src/tests.rs
index 1f95d695..a2e5b69c 100644
--- a/src/components/table_kv/src/tests.rs
+++ b/src/components/table_kv/src/tests.rs
@@ -161,7 +161,7 @@ fn new_memory_tester() -> TableKvTester<MemoryImpl> {
fn random_table_name(prefix: &str) -> String {
let mut rng = thread_rng();
- let v: u32 = rng.gen_range(0, MAX_TABLE_ID);
+ let v: u32 = rng.gen_range(0..MAX_TABLE_ID);
format!("{prefix}_{v}")
}
diff --git a/src/meta_client/src/load_balance.rs
b/src/meta_client/src/load_balance.rs
index 5c2a92ac..79ca96b2 100644
--- a/src/meta_client/src/load_balance.rs
+++ b/src/meta_client/src/load_balance.rs
@@ -46,7 +46,7 @@ impl LoadBalancer for RandomLoadBalancer {
return Ok(&addresses[0]);
}
let mut rng = rand::thread_rng();
- let idx = rng.gen_range(0, len);
+ let idx = rng.gen_range(0..len);
Ok(&addresses[idx])
}
diff --git a/src/server/src/grpc/meta_event_service/mod.rs
b/src/server/src/grpc/meta_event_service/mod.rs
index de5a6af0..75fe48ea 100644
--- a/src/server/src/grpc/meta_event_service/mod.rs
+++ b/src/server/src/grpc/meta_event_service/mod.rs
@@ -37,7 +37,7 @@ use common_types::{
schema::SchemaEncoder,
table::{ShardId, ShardVersion},
};
-use future_ext::RetryConfig;
+use future_ext::{BackoffConfig, RetryConfig};
use generic_error::BoxError;
use horaedbproto::meta_event::{
meta_event_service_server::MetaEventService, ChangeShardRoleRequest,
ChangeShardRoleResponse,
@@ -101,7 +101,11 @@ macro_rules! extract_updated_table_info {
// TODO: configure retry
const RETRY: RetryConfig = RetryConfig {
max_retries: 10,
- interval: Duration::from_secs(5),
+ backoff: BackoffConfig {
+ init_backoff: Duration::from_secs(1),
+ max_backoff: Duration::from_secs(5),
+ base: 2.0,
+ },
};
/// Builder for [MetaServiceImpl].
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]