This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 21765246 feat: use thread pool in manifest merge (#1599)
21765246 is described below

commit 21765246a6160094830ef2a2ce85dc849288fcac
Author: 鲍金日 <[email protected]>
AuthorDate: Mon Nov 25 14:13:21 2024 +0800

    feat: use thread pool in manifest merge (#1599)
    
    ## Rationale
    
    
    ## Detailed Changes
    
    
    ## Test Plan
    CI
---
 horaedb/Cargo.lock                    |  34 ++++-
 horaedb/Cargo.toml                    |   1 +
 horaedb/metric_engine/Cargo.toml      |   1 +
 horaedb/metric_engine/src/manifest.rs | 233 ++++++++++++++++++++++++----------
 horaedb/metric_engine/src/sst.rs      |   4 +-
 horaedb/metric_engine/src/storage.rs  |  50 ++++++--
 horaedb/metric_engine/src/types.rs    |  41 +++++-
 7 files changed, 281 insertions(+), 83 deletions(-)

diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 1ab7b2bc..1d68e849 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -329,6 +329,17 @@ dependencies = [
  "zstd-safe",
 ]
 
+[[package]]
+name = "async-scoped"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4042078ea593edffc452eef14e99fdb2b120caa4ad9618bcdeabc4a023b98740"
+dependencies = [
+ "futures",
+ "pin-project",
+ "tokio",
+]
+
 [[package]]
 name = "async-trait"
 version = "0.1.82"
@@ -1530,7 +1541,7 @@ dependencies = [
 
 [[package]]
 name = "macros"
-version = "2.1.0"
+version = "2.2.0-dev"
 
 [[package]]
 name = "md-5"
@@ -1555,6 +1566,7 @@ dependencies = [
  "anyhow",
  "arrow",
  "arrow-schema",
+ "async-scoped",
  "async-trait",
  "bytes",
  "datafusion",
@@ -1880,6 +1892,26 @@ dependencies = [
  "siphasher",
 ]
 
+[[package]]
+name = "pin-project"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "be57f64e946e500c8ee36ef6331845d40a93055567ec57e8fae13efd33759b95"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "1.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "pin-project-lite"
 version = "0.2.14"
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index 6f56f090..d40bdafd 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -47,6 +47,7 @@ itertools = "0.3"
 lazy_static = "1"
 tracing = "0.1"
 tracing-subscriber = "0.3"
+async-scoped = { version = "0.9.0", features = ["use-tokio"] }
 
 # This profile optimizes for good runtime performance.
 [profile.release]
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index ed237fcb..7235fc68 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -34,6 +34,7 @@ workspace = true
 anyhow = { workspace = true }
 arrow = { workspace = true }
 arrow-schema = { workspace = true }
+async-scoped = { workspace = true }
 async-trait = { workspace = true }
 bytes = { workspace = true }
 datafusion = { workspace = true }
diff --git a/horaedb/metric_engine/src/manifest.rs 
b/horaedb/metric_engine/src/manifest.rs
index f9e6cef3..e435e699 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -25,19 +25,23 @@ use std::{
 };
 
 use anyhow::Context;
+use async_scoped::TokioScope;
 use bytes::Bytes;
-use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
+use futures::{StreamExt, TryStreamExt};
 use object_store::{path::Path, PutPayload};
 use prost::Message;
-use tokio::sync::{
-    mpsc::{self, Receiver, Sender},
-    RwLock,
+use tokio::{
+    runtime::Runtime,
+    sync::{
+        mpsc::{self, Receiver, Sender},
+        RwLock,
+    },
 };
 use tracing::error;
 
 use crate::{
     sst::{FileId, FileMeta, SstFile},
-    types::{ObjectStoreRef, TimeRange},
+    types::{ManifestMergeOptions, ObjectStoreRef, TimeRange},
     AnyhowError, Error, Result,
 };
 
@@ -96,6 +100,7 @@ impl Manifest {
     pub async fn try_new(
         root_dir: String,
         store: ObjectStoreRef,
+        runtime: Arc<Runtime>,
         merge_options: ManifestMergeOptions,
     ) -> Result<Self> {
         let snapshot_path = 
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
@@ -105,6 +110,7 @@ impl Manifest {
             snapshot_path.clone(),
             delta_dir.clone(),
             store.clone(),
+            runtime.clone(),
             merge_options,
         )
         .await?;
@@ -112,7 +118,7 @@ impl Manifest {
         {
             let merger = merger.clone();
             // Start merger in background
-            tokio::spawn(async move {
+            runtime.spawn(async move {
                 merger.run().await;
             });
         }
@@ -175,31 +181,11 @@ enum MergeType {
     Soft,
 }
 
-#[derive(Clone)]
-pub struct ManifestMergeOptions {
-    channel_size: usize,
-    merge_interval_seconds: usize,
-    min_merge_threshold: usize,
-    hard_merge_threshold: usize,
-    soft_merge_threshold: usize,
-}
-
-impl Default for ManifestMergeOptions {
-    fn default() -> Self {
-        Self {
-            channel_size: 10,
-            merge_interval_seconds: 5,
-            min_merge_threshold: 10,
-            soft_merge_threshold: 50,
-            hard_merge_threshold: 90,
-        }
-    }
-}
-
 struct ManifestMerger {
     snapshot_path: Path,
     delta_dir: Path,
     store: ObjectStoreRef,
+    runtime: Arc<Runtime>,
     sender: Sender<MergeType>,
     receiver: RwLock<Receiver<MergeType>>,
     deltas_num: AtomicUsize,
@@ -211,6 +197,7 @@ impl ManifestMerger {
         snapshot_path: Path,
         delta_dir: Path,
         store: ObjectStoreRef,
+        runtime: Arc<Runtime>,
         merge_options: ManifestMergeOptions,
     ) -> Result<Arc<Self>> {
         let (tx, rx) = mpsc::channel(merge_options.channel_size);
@@ -218,6 +205,7 @@ impl ManifestMerger {
             snapshot_path,
             delta_dir,
             store,
+            runtime,
             sender: tx,
             receiver: RwLock::new(rx),
             deltas_num: AtomicUsize::new(0),
@@ -279,31 +267,21 @@ impl ManifestMerger {
     }
 
     async fn do_merge(&self) -> Result<()> {
-        let paths = self
-            .store
-            .list(Some(&self.delta_dir))
-            .map(|value| {
-                value
-                    .map(|v| v.location)
-                    .context("failed to get delta file path")
-            })
-            .try_collect::<Vec<_>>()
-            .await?;
+        let paths = list_delta_paths(&self.store, &self.delta_dir).await?;
         if paths.is_empty() {
             return Ok(());
         }
 
-        let mut stream_read = FuturesUnordered::new();
-        for path in paths.clone() {
-            let store = self.store.clone();
-            // TODO: use thread pool to read manifest files
-            let handle = tokio::spawn(async move { read_delta_file(store, 
path).await });
-            stream_read.push(handle);
-        }
-        let mut delta_files = Vec::with_capacity(stream_read.len());
-        while let Some(res) = stream_read.next().await {
-            let res = res.context("Failed to join read delta task")??;
-            delta_files.push(res);
+        let (_, results) = TokioScope::scope_and_block(|scope| {
+            for path in &paths {
+                scope.spawn(async { read_delta_file(&self.store, path).await 
});
+            }
+        });
+
+        let mut delta_files = Vec::with_capacity(results.len());
+        for res in results {
+            let sst_file = res.context("Failed to join read delta files 
task")??;
+            delta_files.push(sst_file);
         }
 
         let mut payload = read_snapshot(&self.store, 
&self.snapshot_path).await?;
@@ -327,21 +305,19 @@ impl ManifestMerger {
         self.store
             .put(&self.snapshot_path, put_payload)
             .await
-            .context("Failed to update manifest")?;
+            .with_context(|| format!("Failed to update manifest, path:{}", 
self.snapshot_path))?;
 
         // 2. Delete the merged manifest files
-        let mut stream_delete = FuturesUnordered::new();
-        for path in paths {
-            let store = self.store.clone();
-            // TODO: use thread pool to delete sst files
-            let handle = tokio::spawn(async move { delete_delta_file(store, 
path).await });
-            stream_delete.push(handle);
-        }
+        let (_, results) = TokioScope::scope_and_block(|scope| {
+            for path in &paths {
+                scope.spawn(async { delete_delta_file(&self.store, path).await 
});
+            }
+        });
 
-        while let Some(res) = stream_delete.next().await {
+        for res in results {
             match res {
                 Err(e) => {
-                    error!("Failed to join delete delta task, err:{e}")
+                    error!("Failed to join delete delta files task, err:{e}")
                 }
                 Ok(v) => {
                     if let Err(e) = v {
@@ -363,25 +339,25 @@ async fn read_snapshot(store: &ObjectStoreRef, path: 
&Path) -> Result<Payload> {
             let bytes = v
                 .bytes()
                 .await
-                .context("failed to read manifest snapshot")?;
-            let pb_payload =
-                pb_types::Manifest::decode(bytes).context("failed to decode 
manifest snapshot")?;
+                .with_context(|| format!("Failed to read manifest snapshot, 
path:{path}"))?;
+            let pb_payload = pb_types::Manifest::decode(bytes)
+                .with_context(|| format!("Failed to decode manifest snapshot, 
path:{path}"))?;
             Payload::try_from(pb_payload)
         }
         Err(err) => {
             if err.to_string().contains("not found") {
                 Ok(Payload { files: vec![] })
             } else {
-                let context = format!("Failed to get manifest snapshot, 
path:{path}");
+                let context = format!("Failed to read manifest snapshot, 
path:{path}");
                 Err(AnyhowError::new(err).context(context).into())
             }
         }
     }
 }
 
-async fn read_delta_file(store: ObjectStoreRef, sst_path: Path) -> 
Result<SstFile> {
+async fn read_delta_file(store: &ObjectStoreRef, sst_path: &Path) -> 
Result<SstFile> {
     let bytes = store
-        .get(&sst_path)
+        .get(sst_path)
         .await
         .with_context(|| format!("failed to get delta file, path:{sst_path}"))?
         .bytes()
@@ -396,11 +372,136 @@ async fn read_delta_file(store: ObjectStoreRef, 
sst_path: Path) -> Result<SstFil
     Ok(sst)
 }
 
-async fn delete_delta_file(store: ObjectStoreRef, path: Path) -> Result<()> {
+async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> {
     store
-        .delete(&path)
+        .delete(path)
         .await
         .with_context(|| format!("Failed to delete delta files, 
path:{path}"))?;
 
     Ok(())
 }
+
+async fn list_delta_paths(store: &ObjectStoreRef, delta_dir: &Path) -> 
Result<Vec<Path>> {
+    let paths = store
+        .list(Some(delta_dir))
+        .map(|value| {
+            value
+                .map(|v| v.location)
+                .with_context(|| format!("Failed to list delta paths, delta 
dir:{}", delta_dir))
+        })
+        .try_collect::<Vec<_>>()
+        .await?;
+
+    Ok(paths)
+}
+
+#[cfg(test)]
+mod tests {
+    use std::{sync::Arc, thread::sleep};
+
+    use object_store::local::LocalFileSystem;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_manifest() {
+        let root_dir = temp_dir::TempDir::new().unwrap();
+        let runtime = tokio::runtime::Runtime::new().unwrap();
+        let store = Arc::new(LocalFileSystem::new());
+
+        let manifest = Manifest::try_new(
+            root_dir.path().to_string_lossy().to_string(),
+            store,
+            Arc::new(runtime),
+            ManifestMergeOptions::default(),
+        )
+        .await
+        .unwrap();
+
+        for i in 0..20 {
+            let time_range = (i..i + 1).into();
+            let meta = FileMeta {
+                max_sequence: i as u64,
+                num_rows: i as u32,
+                size: i as u32,
+                time_range,
+            };
+            manifest.add_file(i as u64, meta).await.unwrap();
+        }
+
+        let find_range = (10..15).into();
+        let mut ssts = manifest.find_ssts(&find_range).await;
+
+        let mut expected_ssts = (10..15)
+            .map(|i| {
+                let id = i as u64;
+                let time_range = (i..i + 1).into();
+                let meta = FileMeta {
+                    max_sequence: i as u64,
+                    num_rows: i as u32,
+                    size: i as u32,
+                    time_range,
+                };
+                SstFile { id, meta }
+            })
+            .collect::<Vec<_>>();
+
+        expected_ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        assert_eq!(expected_ssts, ssts);
+    }
+
+    #[tokio::test]
+    async fn test_merge_manifest() {
+        let root_dir = temp_dir::TempDir::new()
+            .unwrap()
+            .path()
+            .to_string_lossy()
+            .to_string();
+        let snapshot_path = 
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
+        let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+        let runtime = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(4)
+            .enable_all()
+            .build()
+            .unwrap();
+        let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
+
+        let manifest = Manifest::try_new(
+            root_dir,
+            store.clone(),
+            Arc::new(runtime),
+            ManifestMergeOptions {
+                merge_interval_seconds: 1,
+                ..Default::default()
+            },
+        )
+        .await
+        .unwrap();
+
+        // Add manifest files
+        for i in 0..20 {
+            let time_range = (i..i + 1).into();
+            let meta = FileMeta {
+                max_sequence: i as u64,
+                num_rows: i as u32,
+                size: i as u32,
+                time_range,
+            };
+            manifest.add_file(i as u64, meta).await.unwrap();
+        }
+
+        // Wait for merge manifest to finish
+        sleep(Duration::from_secs(2));
+
+        let mut mem_ssts = manifest.payload.read().await.files.clone();
+        let mut ssts = read_snapshot(&store, 
&snapshot_path).await.unwrap().files;
+
+        mem_ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        assert_eq!(mem_ssts, ssts);
+
+        let delta_paths = list_delta_paths(&store, &delta_dir).await.unwrap();
+        assert!(delta_paths.is_empty());
+    }
+}
diff --git a/horaedb/metric_engine/src/sst.rs b/horaedb/metric_engine/src/sst.rs
index 703b4bc4..644988f8 100644
--- a/horaedb/metric_engine/src/sst.rs
+++ b/horaedb/metric_engine/src/sst.rs
@@ -31,7 +31,7 @@ pub const PREFIX_PATH: &str = "data";
 
 pub type FileId = u64;
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
 pub struct SstFile {
     pub id: FileId,
     pub meta: FileMeta,
@@ -58,7 +58,7 @@ impl From<SstFile> for pb_types::SstFile {
     }
 }
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
 pub struct FileMeta {
     pub max_sequence: u64,
     pub num_rows: u32,
diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index c8c832d2..ac37e9c8 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -54,12 +54,16 @@ use parquet::{
     format::SortingColumn,
     schema::types::ColumnPath,
 };
+use tokio::runtime::Runtime;
 
 use crate::{
-    manifest::{Manifest, ManifestMergeOptions},
+    manifest::Manifest,
     read::{DefaultParquetFileReaderFactory, MergeExec},
     sst::{allocate_id, FileId, FileMeta, SstFile},
-    types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult, 
SEQ_COLUMN_NAME},
+    types::{
+        ObjectStoreRef, RuntimeOptions, StorageOptions, TimeRange, 
WriteOptions, WriteResult,
+        SEQ_COLUMN_NAME,
+    },
     Result,
 };
 
@@ -93,6 +97,25 @@ pub trait TimeMergeStorage {
     async fn compact(&self, req: CompactRequest) -> Result<()>;
 }
 
+struct StorageRuntimes {
+    compact_runtime: Arc<Runtime>,
+}
+
+impl StorageRuntimes {
+    pub fn new(runtime_opts: RuntimeOptions) -> Result<Self> {
+        let compact_runtime = tokio::runtime::Builder::new_multi_thread()
+            .thread_name("storage-compact")
+            .worker_threads(runtime_opts.compact_thread_num)
+            .enable_all()
+            .build()
+            .context("build storgae compact runtime")?;
+
+        Ok(Self {
+            compact_runtime: Arc::new(compact_runtime),
+        })
+    }
+}
+
 /// `TimeMergeStorage` implementation using cloud object storage, it will split
 /// data into different segments(aka `segment_duration`) based time range.
 ///
@@ -105,6 +128,7 @@ pub struct CloudObjectStorage {
     arrow_schema: SchemaRef,
     num_primary_keys: usize,
     manifest: Manifest,
+    runtimes: StorageRuntimes,
 
     df_schema: DFSchema,
     write_props: WriterProperties,
@@ -128,14 +152,16 @@ impl CloudObjectStorage {
         store: ObjectStoreRef,
         arrow_schema: SchemaRef,
         num_primary_keys: usize,
-        write_options: WriteOptions,
-        merge_options: ManifestMergeOptions,
+        storage_opts: StorageOptions,
     ) -> Result<Self> {
         let manifest_prefix = crate::manifest::PREFIX_PATH;
+        let runtimes = StorageRuntimes::new(storage_opts.runtime_opts)?;
+
         let manifest = Manifest::try_new(
             format!("{path}/{manifest_prefix}"),
             store.clone(),
-            merge_options,
+            runtimes.compact_runtime.clone(),
+            storage_opts.manifest_merge_opts,
         )
         .await?;
         let mut new_fields = arrow_schema.fields.clone().to_vec();
@@ -149,7 +175,7 @@ impl CloudObjectStorage {
             arrow_schema.metadata.clone(),
         ));
         let df_schema = 
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
-        let write_props = Self::build_write_props(write_options, 
num_primary_keys);
+        let write_props = Self::build_write_props(storage_opts.write_opts, 
num_primary_keys);
         Ok(Self {
             path,
             num_primary_keys,
@@ -157,6 +183,7 @@ impl CloudObjectStorage {
             store,
             arrow_schema,
             manifest,
+            runtimes,
             df_schema,
             write_props,
         })
@@ -414,7 +441,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
 
     use super::*;
-    use crate::{arrow_schema, manifest::ManifestMergeOptions, 
types::Timestamp};
+    use crate::{arrow_schema, types::Timestamp};
 
     #[tokio::test]
     async fn test_build_scan_plan() {
@@ -426,8 +453,7 @@ mod tests {
             store,
             schema.clone(),
             1, // num_primary_keys
-            WriteOptions::default(),
-            ManifestMergeOptions::default(),
+            StorageOptions::default(),
         )
         .await
         .unwrap();
@@ -472,8 +498,7 @@ mod tests {
             store,
             schema.clone(),
             2, // num_primary_keys
-            WriteOptions::default(),
-            ManifestMergeOptions::default(),
+            StorageOptions::default(),
         )
         .await
         .unwrap();
@@ -549,8 +574,7 @@ mod tests {
             store,
             schema.clone(),
             1,
-            WriteOptions::default(),
-            ManifestMergeOptions::default(),
+            StorageOptions::default(),
         )
         .await
         .unwrap();
diff --git a/horaedb/metric_engine/src/types.rs 
b/horaedb/metric_engine/src/types.rs
index 624b8a2e..98ba9764 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -68,7 +68,7 @@ impl Timestamp {
     pub const MIN: Timestamp = Timestamp(i64::MIN);
 }
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
 pub struct TimeRange(Range<Timestamp>);
 
 impl From<Range<Timestamp>> for TimeRange {
@@ -146,3 +146,42 @@ impl Default for WriteOptions {
         }
     }
 }
+
+pub struct RuntimeOptions {
+    pub compact_thread_num: usize,
+}
+
+impl Default for RuntimeOptions {
+    fn default() -> Self {
+        Self {
+            compact_thread_num: 4,
+        }
+    }
+}
+
+pub struct ManifestMergeOptions {
+    pub channel_size: usize,
+    pub merge_interval_seconds: usize,
+    pub min_merge_threshold: usize,
+    pub hard_merge_threshold: usize,
+    pub soft_merge_threshold: usize,
+}
+
+impl Default for ManifestMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 10,
+            merge_interval_seconds: 5,
+            min_merge_threshold: 10,
+            soft_merge_threshold: 50,
+            hard_merge_threshold: 90,
+        }
+    }
+}
+
+#[derive(Default)]
+pub struct StorageOptions {
+    pub write_opts: WriteOptions,
+    pub manifest_merge_opts: ManifestMergeOptions,
+    pub runtime_opts: RuntimeOptions,
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to