This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new b4ee05ef4 fix(layers/throttle): await limiter before throttled writes
(#6671)
b4ee05ef4 is described below
commit b4ee05ef4c442508f7a0e39d2b072a6160f0772b
Author: TennyZhuang <[email protected]>
AuthorDate: Wed Oct 15 19:41:39 2025 +0800
fix(layers/throttle): await limiter before throttled writes (#6671)
---
core/src/layers/throttle.rs | 46 ++++++++++++++++++++++-----------------------
1 file changed, 22 insertions(+), 24 deletions(-)
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3e1779123..1394aaaaf 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
use governor::Quota;
use governor::RateLimiter;
-use governor::clock::Clock;
use governor::clock::DefaultClock;
use governor::middleware::NoOpMiddleware;
use governor::state::InMemoryState;
@@ -170,30 +169,29 @@ impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
async fn write(&mut self, bs: Buffer) -> Result<()> {
- let buf_length = NonZeroU32::new(bs.len() as u32).unwrap();
-
- loop {
- match self.limiter.check_n(buf_length) {
- Ok(res) => match res {
- Ok(_) => return self.inner.write(bs).await,
- // the query is valid but the Decider can not accommodate
them.
- Err(not_until) => {
- let _ =
not_until.wait_time_from(DefaultClock::default().now());
- // TODO: Should lock the limiter and wait for the
wait_time, or should let other small requests go first?
-
- // FIXME: we should sleep here.
- // tokio::time::sleep(wait_time).await;
- }
- },
- // the query was invalid as the rate limit parameters can
"never" accommodate the number of cells queried for.
- Err(_) => {
- return Err(Error::new(
- ErrorKind::RateLimited,
- "InsufficientCapacity due to burst size being smaller
than the request size",
- ));
- }
- }
+ let len = bs.len();
+ if len == 0 {
+ return self.inner.write(bs).await;
}
+
+ if len > u32::MAX as usize {
+ return Err(Error::new(
+ ErrorKind::RateLimited,
+ "request size exceeds throttle quota capacity",
+ ));
+ }
+
+ let buf_length =
+ NonZeroU32::new(len as u32).expect("len is non-zero so NonZeroU32
must exist");
+
+ self.limiter.until_n_ready(buf_length).await.map_err(|_| {
+ Error::new(
+ ErrorKind::RateLimited,
+ "burst size is smaller than the request size",
+ )
+ })?;
+
+ self.inner.write(bs).await
}
async fn abort(&mut self) -> Result<()> {