This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 72f63f21 feat: support persist manifest efficientlly (#1596)
72f63f21 is described below

commit 72f63f2173f9da9c25ff241ce7fd497b16f634c1
Author: 鲍金日 <[email protected]>
AuthorDate: Fri Nov 22 11:12:50 2024 +0800

    feat: support persist manifest efficientlly (#1596)
    
    ## Rationale
    close #1592
    
    ## Detailed Changes
    - support merge manifest ssts background
    
    ## Test Plan
    CI
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 horaedb/Cargo.lock                    |   1 +
 horaedb/metric_engine/Cargo.toml      |   1 +
 horaedb/metric_engine/src/manifest.rs | 349 +++++++++++++++++++++++++++++-----
 horaedb/metric_engine/src/storage.rs  |  16 +-
 4 files changed, 320 insertions(+), 47 deletions(-)

diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 4c8aac43..1ab7b2bc 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -1569,6 +1569,7 @@ dependencies = [
  "temp-dir",
  "thiserror",
  "tokio",
+ "tracing",
 ]
 
 [[package]]
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index 095532bd..ed237fcb 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -47,6 +47,7 @@ pb_types = { workspace = true }
 prost = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
+tracing = { workspace = true }
 
 [dev-dependencies]
 temp-dir = { workspace = true }
diff --git a/horaedb/metric_engine/src/manifest.rs 
b/horaedb/metric_engine/src/manifest.rs
index 0d865a04..f9e6cef3 100644
--- a/horaedb/metric_engine/src/manifest.rs
+++ b/horaedb/metric_engine/src/manifest.rs
@@ -15,11 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::{
+    collections::HashSet,
+    sync::{
+        atomic::{AtomicUsize, Ordering},
+        Arc,
+    },
+    time::Duration,
+};
+
 use anyhow::Context;
 use bytes::Bytes;
+use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
 use object_store::{path::Path, PutPayload};
 use prost::Message;
-use tokio::sync::RwLock;
+use tokio::sync::{
+    mpsc::{self, Receiver, Sender},
+    RwLock,
+};
+use tracing::error;
 
 use crate::{
     sst::{FileId, FileMeta, SstFile},
@@ -29,11 +43,12 @@ use crate::{
 
 pub const PREFIX_PATH: &str = "manifest";
 pub const SNAPSHOT_FILENAME: &str = "snapshot";
+pub const DELTA_PREFIX: &str = "delta";
 
 pub struct Manifest {
-    path: String,
-    snapshot_path: Path,
+    delta_dir: Path,
     store: ObjectStoreRef,
+    merger: Arc<ManifestMerger>,
 
     payload: RwLock<Payload>,
 }
@@ -42,6 +57,15 @@ pub struct Payload {
     files: Vec<SstFile>,
 }
 
+impl Payload {
+    // TODO: we could sort sst files by name asc, then the dedup will be more
+    // efficient
+    pub fn dedup_files(&mut self) {
+        let mut seen = HashSet::with_capacity(self.files.len());
+        self.files.retain(|file| seen.insert(file.id));
+    }
+}
+
 impl TryFrom<pb_types::Manifest> for Payload {
     type Error = Error;
 
@@ -69,63 +93,67 @@ impl From<Payload> for pb_types::Manifest {
 }
 
 impl Manifest {
-    pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
-        let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
-        let payload = match store.get(&snapshot_path).await {
-            Ok(v) => {
-                let bytes = v
-                    .bytes()
-                    .await
-                    .context("failed to read manifest snapshot")?;
-                let pb_payload = pb_types::Manifest::decode(bytes)
-                    .context("failed to decode manifest snapshot")?;
-                Payload::try_from(pb_payload)?
-            }
-            Err(err) => {
-                if err.to_string().contains("not found") {
-                    Payload { files: vec![] }
-                } else {
-                    let context = format!("Failed to get manifest snapshot, 
path:{snapshot_path}");
-                    return Err(AnyhowError::new(err).context(context).into());
-                }
-            }
-        };
+    pub async fn try_new(
+        root_dir: String,
+        store: ObjectStoreRef,
+        merge_options: ManifestMergeOptions,
+    ) -> Result<Self> {
+        let snapshot_path = 
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
+        let delta_dir = Path::from(format!("{root_dir}/{DELTA_PREFIX}"));
+
+        let merger = ManifestMerger::try_new(
+            snapshot_path.clone(),
+            delta_dir.clone(),
+            store.clone(),
+            merge_options,
+        )
+        .await?;
+
+        {
+            let merger = merger.clone();
+            // Start merger in background
+            tokio::spawn(async move {
+                merger.run().await;
+            });
+        }
+
+        let payload = read_snapshot(&store, &snapshot_path).await?;
 
         Ok(Self {
-            path,
-            snapshot_path,
+            delta_dir,
             store,
+            merger,
             payload: RwLock::new(payload),
         })
     }
 
-    // TODO: Now this functions is poorly implemented, we concat new_sst to
-    // snapshot, and upload it back in a whole.
-    // In more efficient way, we can create a new diff file, and do compaction 
in
-    // background to merge them to `snapshot`.
     pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
-        let mut payload = self.payload.write().await;
-        let mut tmp_ssts = payload.files.clone();
+        self.merger.maybe_schedule_merge().await?;
+
+        let new_sst_path = Path::from(format!("{}/{id}", self.delta_dir));
         let new_sst = SstFile { id, meta };
-        tmp_ssts.push(new_sst.clone());
-        let pb_manifest = pb_types::Manifest {
-            files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
-        };
 
-        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
-        pb_manifest
+        let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
+        let mut buf: Vec<u8> = 
Vec::with_capacity(new_sst_payload.encoded_len());
+        new_sst_payload
             .encode(&mut buf)
-            .context("failed to encode manifest")?;
+            .context("failed to encode manifest file")?;
         let put_payload = PutPayload::from_bytes(Bytes::from(buf));
 
-        // 1. Persist the snapshot
+        // 1. Persist the delta manifest
         self.store
-            .put(&self.snapshot_path, put_payload)
+            .put(&new_sst_path, put_payload)
             .await
-            .context("Failed to update manifest")?;
+            .with_context(|| format!("Failed to write delta manifest, 
path:{}", new_sst_path))?;
 
         // 2. Update cached payload
-        payload.files.push(new_sst);
+        {
+            let mut payload = self.payload.write().await;
+            payload.files.push(new_sst);
+        }
+
+        // 3. Update delta files num
+        self.merger.add_delta_num();
 
         Ok(())
     }
@@ -141,3 +169,238 @@ impl Manifest {
             .collect()
     }
 }
+
+enum MergeType {
+    Hard,
+    Soft,
+}
+
+#[derive(Clone)]
+pub struct ManifestMergeOptions {
+    channel_size: usize,
+    merge_interval_seconds: usize,
+    min_merge_threshold: usize,
+    hard_merge_threshold: usize,
+    soft_merge_threshold: usize,
+}
+
+impl Default for ManifestMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 10,
+            merge_interval_seconds: 5,
+            min_merge_threshold: 10,
+            soft_merge_threshold: 50,
+            hard_merge_threshold: 90,
+        }
+    }
+}
+
+struct ManifestMerger {
+    snapshot_path: Path,
+    delta_dir: Path,
+    store: ObjectStoreRef,
+    sender: Sender<MergeType>,
+    receiver: RwLock<Receiver<MergeType>>,
+    deltas_num: AtomicUsize,
+    merge_options: ManifestMergeOptions,
+}
+
+impl ManifestMerger {
+    async fn try_new(
+        snapshot_path: Path,
+        delta_dir: Path,
+        store: ObjectStoreRef,
+        merge_options: ManifestMergeOptions,
+    ) -> Result<Arc<Self>> {
+        let (tx, rx) = mpsc::channel(merge_options.channel_size);
+        let merger = Self {
+            snapshot_path,
+            delta_dir,
+            store,
+            sender: tx,
+            receiver: RwLock::new(rx),
+            deltas_num: AtomicUsize::new(0),
+            merge_options,
+        };
+        // Merge all delta files when startup
+        merger.do_merge().await?;
+
+        Ok(Arc::new(merger))
+    }
+
+    async fn run(&self) {
+        let merge_interval = 
Duration::from_secs(self.merge_options.merge_interval_seconds as u64);
+        let mut receiver = self.receiver.write().await;
+        loop {
+            tokio::select! {
+                _ = tokio::time::sleep(merge_interval) => {
+                    if self.deltas_num.load(Ordering::Relaxed) > 
self.merge_options.min_merge_threshold {
+                        if let Err(err) = self.do_merge().await {
+                            error!("Failed to merge delta, err:{err}");
+                        }
+                    }
+                }
+                _merge_type = receiver.recv() => {
+                    if self.deltas_num.load(Ordering::Relaxed) > 
self.merge_options.min_merge_threshold {
+                        if let Err(err) = self.do_merge().await {
+                            error!("Failed to merge delta, err:{err}");
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    fn schedule_merge(&self, task: MergeType) {
+        if let Err(err) = self.sender.try_send(task) {
+            error!("Failed to send merge task, err:{err}");
+        }
+    }
+
+    async fn maybe_schedule_merge(&self) -> Result<()> {
+        let current_num = self.deltas_num.load(Ordering::Relaxed);
+
+        if current_num > self.merge_options.hard_merge_threshold {
+            self.schedule_merge(MergeType::Hard);
+            return Err(AnyhowError::msg(format!(
+                "Manifest has too many delta files, value:{current_num}"
+            ))
+            .into());
+        } else if current_num > self.merge_options.soft_merge_threshold {
+            self.schedule_merge(MergeType::Soft);
+        }
+
+        Ok(())
+    }
+
+    fn add_delta_num(&self) {
+        self.deltas_num.fetch_add(1, Ordering::Relaxed);
+    }
+
+    async fn do_merge(&self) -> Result<()> {
+        let paths = self
+            .store
+            .list(Some(&self.delta_dir))
+            .map(|value| {
+                value
+                    .map(|v| v.location)
+                    .context("failed to get delta file path")
+            })
+            .try_collect::<Vec<_>>()
+            .await?;
+        if paths.is_empty() {
+            return Ok(());
+        }
+
+        let mut stream_read = FuturesUnordered::new();
+        for path in paths.clone() {
+            let store = self.store.clone();
+            // TODO: use thread pool to read manifest files
+            let handle = tokio::spawn(async move { read_delta_file(store, 
path).await });
+            stream_read.push(handle);
+        }
+        let mut delta_files = Vec::with_capacity(stream_read.len());
+        while let Some(res) = stream_read.next().await {
+            let res = res.context("Failed to join read delta task")??;
+            delta_files.push(res);
+        }
+
+        let mut payload = read_snapshot(&self.store, 
&self.snapshot_path).await?;
+        payload.files.extend(delta_files);
+        payload.dedup_files();
+
+        let pb_manifest = pb_types::Manifest {
+            files: payload
+                .files
+                .into_iter()
+                .map(|f| f.into())
+                .collect::<Vec<_>>(),
+        };
+        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
+        pb_manifest
+            .encode(&mut buf)
+            .context("failed to encode manifest")?;
+        let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+
+        // 1. Persist the snapshot
+        self.store
+            .put(&self.snapshot_path, put_payload)
+            .await
+            .context("Failed to update manifest")?;
+
+        // 2. Delete the merged manifest files
+        let mut stream_delete = FuturesUnordered::new();
+        for path in paths {
+            let store = self.store.clone();
+            // TODO: use thread pool to delete sst files
+            let handle = tokio::spawn(async move { delete_delta_file(store, 
path).await });
+            stream_delete.push(handle);
+        }
+
+        while let Some(res) = stream_delete.next().await {
+            match res {
+                Err(e) => {
+                    error!("Failed to join delete delta task, err:{e}")
+                }
+                Ok(v) => {
+                    if let Err(e) = v {
+                        error!("Failed to delete delta, err:{e}")
+                    } else {
+                        self.deltas_num.fetch_sub(1, Ordering::Relaxed);
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
+async fn read_snapshot(store: &ObjectStoreRef, path: &Path) -> Result<Payload> 
{
+    match store.get(path).await {
+        Ok(v) => {
+            let bytes = v
+                .bytes()
+                .await
+                .context("failed to read manifest snapshot")?;
+            let pb_payload =
+                pb_types::Manifest::decode(bytes).context("failed to decode 
manifest snapshot")?;
+            Payload::try_from(pb_payload)
+        }
+        Err(err) => {
+            if err.to_string().contains("not found") {
+                Ok(Payload { files: vec![] })
+            } else {
+                let context = format!("Failed to get manifest snapshot, 
path:{path}");
+                Err(AnyhowError::new(err).context(context).into())
+            }
+        }
+    }
+}
+
+async fn read_delta_file(store: ObjectStoreRef, sst_path: Path) -> 
Result<SstFile> {
+    let bytes = store
+        .get(&sst_path)
+        .await
+        .with_context(|| format!("failed to get delta file, path:{sst_path}"))?
+        .bytes()
+        .await
+        .with_context(|| format!("failed to read delta file, 
path:{sst_path}"))?;
+
+    let pb_sst = pb_types::SstFile::decode(bytes)
+        .with_context(|| format!("failed to decode delta file, 
path:{sst_path}"))?;
+
+    let sst = SstFile::try_from(pb_sst)
+        .with_context(|| format!("failed to convert delta file, 
path:{sst_path}"))?;
+    Ok(sst)
+}
+
+async fn delete_delta_file(store: ObjectStoreRef, path: Path) -> Result<()> {
+    store
+        .delete(&path)
+        .await
+        .with_context(|| format!("Failed to delete delta files, 
path:{path}"))?;
+
+    Ok(())
+}
diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index 02816088..c8c832d2 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -56,7 +56,7 @@ use parquet::{
 };
 
 use crate::{
-    manifest::Manifest,
+    manifest::{Manifest, ManifestMergeOptions},
     read::{DefaultParquetFileReaderFactory, MergeExec},
     sst::{allocate_id, FileId, FileMeta, SstFile},
     types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult, 
SEQ_COLUMN_NAME},
@@ -129,10 +129,15 @@ impl CloudObjectStorage {
         arrow_schema: SchemaRef,
         num_primary_keys: usize,
         write_options: WriteOptions,
+        merge_options: ManifestMergeOptions,
     ) -> Result<Self> {
         let manifest_prefix = crate::manifest::PREFIX_PATH;
-        let manifest =
-            Manifest::try_new(format!("{path}/{manifest_prefix}"), 
store.clone()).await?;
+        let manifest = Manifest::try_new(
+            format!("{path}/{manifest_prefix}"),
+            store.clone(),
+            merge_options,
+        )
+        .await?;
         let mut new_fields = arrow_schema.fields.clone().to_vec();
         new_fields.push(Arc::new(Field::new(
             SEQ_COLUMN_NAME,
@@ -409,7 +414,7 @@ mod tests {
     use object_store::local::LocalFileSystem;
 
     use super::*;
-    use crate::{arrow_schema, types::Timestamp};
+    use crate::{arrow_schema, manifest::ManifestMergeOptions, 
types::Timestamp};
 
     #[tokio::test]
     async fn test_build_scan_plan() {
@@ -422,6 +427,7 @@ mod tests {
             schema.clone(),
             1, // num_primary_keys
             WriteOptions::default(),
+            ManifestMergeOptions::default(),
         )
         .await
         .unwrap();
@@ -467,6 +473,7 @@ mod tests {
             schema.clone(),
             2, // num_primary_keys
             WriteOptions::default(),
+            ManifestMergeOptions::default(),
         )
         .await
         .unwrap();
@@ -543,6 +550,7 @@ mod tests {
             schema.clone(),
             1,
             WriteOptions::default(),
+            ManifestMergeOptions::default(),
         )
         .await
         .unwrap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to