This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 626c3aeb0 fix(object_store): implement get_range() to avoid
unnecessary stat() calls (#7192)
626c3aeb0 is described below
commit 626c3aeb02f65e7443675e17d4f21c8808c81522
Author: Krisztián Szűcs <[email protected]>
AuthorDate: Sun Feb 22 06:47:44 2026 +0100
fix(object_store): implement get_range() to avoid unnecessary stat() calls
(#7192)
* fix(object_store): implement get_range() to avoid unnecessary stat() calls
* test(object_store): unit test for get_range() not calling stat()
---
integrations/object_store/src/store.rs | 175 +++++++++++++++++++++++++++++++++
1 file changed, 175 insertions(+)
diff --git a/integrations/object_store/src/store.rs
b/integrations/object_store/src/store.rs
index 6ae7cac93..a6edce20c 100644
--- a/integrations/object_store/src/store.rs
+++ b/integrations/object_store/src/store.rs
@@ -18,11 +18,13 @@
use std::fmt::{self, Debug, Display, Formatter};
use std::future::IntoFuture;
use std::io;
+use std::ops::Range;
use std::sync::Arc;
use crate::utils::*;
use crate::{datetime_to_timestamp, timestamp_to_datetime};
use async_trait::async_trait;
+use bytes::Bytes;
use futures::FutureExt;
use futures::StreamExt;
use futures::TryStreamExt;
@@ -430,6 +432,29 @@ impl ObjectStore for OpendalStore {
})
}
+ /// Return the bytes that are stored at the specified location
+ /// in the given byte range.
+ ///
+ /// See [`GetRange::Bounded`] for more details on how `range` gets
interpreted
+ async fn get_range(&self, location: &Path, range: Range<u64>) ->
object_store::Result<Bytes> {
+ // For bounded ranges, we can read directly without calling stat()
+ // This avoids the unnecessary metadata fetch in get_opts
+ let raw_location = percent_decode_path(location.as_ref());
+ let reader = self
+ .inner
+ .reader_with(&raw_location)
+ .into_send()
+ .await
+ .map_err(|err| format_object_store_error(err, location.as_ref()))?;
+
+ reader
+ .read(range.start..range.end)
+ .into_send()
+ .await
+ .map(|buf| buf.to_bytes())
+ .map_err(|err| format_object_store_error(err, location.as_ref()))
+ }
+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
let decoded_location = percent_decode_path(location.as_ref());
self.inner
@@ -858,4 +883,154 @@ mod tests {
"data/test.txt"
);
}
+
+ /// Custom layer that counts stat operations for testing
+ mod stat_counter {
+ use super::*;
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ #[derive(Debug, Clone)]
+ pub struct StatCounterLayer {
+ count: Arc<AtomicUsize>,
+ }
+
+ impl StatCounterLayer {
+ pub fn new(count: Arc<AtomicUsize>) -> Self {
+ Self { count }
+ }
+ }
+
+ impl<A: opendal::raw::Access> opendal::raw::Layer<A> for
StatCounterLayer {
+ type LayeredAccess = StatCounterAccessor<A>;
+
+ fn layer(&self, inner: A) -> Self::LayeredAccess {
+ StatCounterAccessor {
+ inner,
+ count: self.count.clone(),
+ }
+ }
+ }
+
+ #[derive(Debug, Clone)]
+ pub struct StatCounterAccessor<A> {
+ inner: A,
+ count: Arc<AtomicUsize>,
+ }
+
+ impl<A: opendal::raw::Access> opendal::raw::LayeredAccess for
StatCounterAccessor<A> {
+ type Inner = A;
+ type Reader = A::Reader;
+ type Writer = A::Writer;
+ type Lister = A::Lister;
+ type Deleter = A::Deleter;
+
+ fn inner(&self) -> &Self::Inner {
+ &self.inner
+ }
+
+ async fn stat(
+ &self,
+ path: &str,
+ args: opendal::raw::OpStat,
+ ) -> opendal::Result<opendal::raw::RpStat> {
+ self.count.fetch_add(1, Ordering::SeqCst);
+ self.inner.stat(path, args).await
+ }
+
+ async fn read(
+ &self,
+ path: &str,
+ args: opendal::raw::OpRead,
+ ) -> opendal::Result<(opendal::raw::RpRead, Self::Reader)> {
+ self.inner.read(path, args).await
+ }
+
+ async fn write(
+ &self,
+ path: &str,
+ args: opendal::raw::OpWrite,
+ ) -> opendal::Result<(opendal::raw::RpWrite, Self::Writer)> {
+ self.inner.write(path, args).await
+ }
+
+ async fn delete(&self) -> opendal::Result<(opendal::raw::RpDelete,
Self::Deleter)> {
+ self.inner.delete().await
+ }
+
+ async fn list(
+ &self,
+ path: &str,
+ args: opendal::raw::OpList,
+ ) -> opendal::Result<(opendal::raw::RpList, Self::Lister)> {
+ self.inner.list(path, args).await
+ }
+
+ async fn copy(
+ &self,
+ from: &str,
+ to: &str,
+ args: opendal::raw::OpCopy,
+ ) -> opendal::Result<opendal::raw::RpCopy> {
+ self.inner.copy(from, to, args).await
+ }
+
+ async fn rename(
+ &self,
+ from: &str,
+ to: &str,
+ args: opendal::raw::OpRename,
+ ) -> opendal::Result<opendal::raw::RpRename> {
+ self.inner.rename(from, to, args).await
+ }
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_range_no_stat() {
+ use std::sync::atomic::{AtomicUsize, Ordering};
+
+ // Create a stat counter and operator with tracking layer
+ let stat_count = Arc::new(AtomicUsize::new(0));
+ let op = Operator::new(opendal::services::Memory::default())
+ .unwrap()
+ .layer(stat_counter::StatCounterLayer::new(stat_count.clone()))
+ .finish();
+ let store = OpendalStore::new(op);
+
+ // Create a test file
+ let location = "test_get_range.txt".into();
+ let value = Bytes::from_static(b"Hello, world!");
+ store.put(&location, value.clone().into()).await.unwrap();
+
+ // Reset counter after put
+ stat_count.store(0, Ordering::SeqCst);
+
+ // Test 1: get_range should NOT call stat()
+ let ret = store.get_range(&location, 0..5).await.unwrap();
+ assert_eq!(Bytes::from_static(b"Hello"), ret);
+ assert_eq!(
+ stat_count.load(Ordering::SeqCst),
+ 0,
+ "get_range should not call stat()"
+ );
+
+ // Reset counter
+ stat_count.store(0, Ordering::SeqCst);
+
+ // Test 2: get_opts SHOULD call stat() to get metadata
+ let opts = object_store::GetOptions {
+ range: Some(object_store::GetRange::Bounded(0..5)),
+ ..Default::default()
+ };
+ let ret = store.get_opts(&location, opts).await.unwrap();
+ let data = ret.bytes().await.unwrap();
+ assert_eq!(Bytes::from_static(b"Hello"), data);
+ assert!(
+ stat_count.load(Ordering::SeqCst) > 0,
+ "get_opts should call stat() to get metadata"
+ );
+
+ // Cleanup
+ store.delete(&location).await.unwrap();
+ }
}