This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bcf0377 fix: get_ranges is not spawned in io-runtime (#1426)
3bcf0377 is described below

commit 3bcf0377a799e0aa10423bb321bf611d61b620e8
Author: WEI Xikai <[email protected]>
AuthorDate: Thu Jan 11 18:13:12 2024 +0800

    fix: get_ranges is not spawned in io-runtime (#1426)
    
    ## Rationale
    The `get_ranges` should be executed in the io-runtime, but it is not
    now.
    
    ## Detailed Changes
    Spawn the `get_ranges` task in the io-runtime.
    
    ## Test Plan
    Should pass the CI.
---
 components/object_store/src/metrics.rs | 72 +++++++++++++++++++++++++++++++---
 1 file changed, 66 insertions(+), 6 deletions(-)

diff --git a/components/object_store/src/metrics.rs 
b/components/object_store/src/metrics.rs
index 837cc437..8000a9ac 100644
--- a/components/object_store/src/metrics.rs
+++ b/components/object_store/src/metrics.rs
@@ -147,7 +147,16 @@ impl ObjectStore for StoreWithMetrics {
         OBJECT_STORE_THROUGHPUT_HISTOGRAM
             .put
             .observe(bytes.len() as f64);
-        self.store.put(location, bytes).await
+
+        let loc = location.clone();
+        let store = self.store.clone();
+        self.runtime
+            .spawn(async move { store.put(&loc, bytes).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
     }
 
     async fn put_multipart(
@@ -228,11 +237,22 @@ impl ObjectStore for StoreWithMetrics {
 
     async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> 
Result<Vec<Bytes>> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_ranges.start_timer();
-        let result = self.store.get_ranges(location, ranges).await?;
+        let store = self.store.clone();
+        let loc = location.clone();
+        let ranges = ranges.to_vec();
+        let result = self
+            .runtime
+            .spawn(async move { store.get_ranges(&loc, &ranges).await })
+            .await
+            .map_err(|e| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(e),
+            })??;
         let len: usize = result.iter().map(|v| v.len()).sum();
         OBJECT_STORE_THROUGHPUT_HISTOGRAM
             .get_ranges
             .observe(len as f64);
+
         Ok(result)
     }
 
@@ -286,25 +306,65 @@ impl ObjectStore for StoreWithMetrics {
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM.copy.start_timer();
-        self.store.copy(from, to).await
+
+        let store = self.store.clone();
+        let from = from.clone();
+        let to = to.clone();
+        self.runtime
+            .spawn(async move { store.copy(&from, &to).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
     }
 
     async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM.rename.start_timer();
-        self.store.rename(from, to).await
+
+        let store = self.store.clone();
+        let from = from.clone();
+        let to = to.clone();
+        self.runtime
+            .spawn(async move { store.rename(&from, &to).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
     }
 
     async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM
             .copy_if_not_exists
             .start_timer();
-        self.store.copy_if_not_exists(from, to).await
+
+        let store = self.store.clone();
+        let from = from.clone();
+        let to = to.clone();
+        self.runtime
+            .spawn(async move { store.copy_if_not_exists(&from, &to).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
     }
 
     async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM
             .rename_if_not_exists
             .start_timer();
-        self.store.rename_if_not_exists(from, to).await
+
+        let store = self.store.clone();
+        let from = from.clone();
+        let to = to.clone();
+        self.runtime
+            .spawn(async move { store.rename_if_not_exists(&from, &to).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to