jiacai2050 commented on code in PR #1596:
URL: https://github.com/apache/horaedb/pull/1596#discussion_r1846002009


##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -140,4 +173,189 @@ impl Manifest {
             .cloned()
             .collect()
     }
+
+    async fn sender(&self, task: SstsMergeTask) {
+        if let Err(err) = self.sender.send(task).await {
+            error!("Failed to send merge ssts task, err: {:?}", err);
+        }
+    }
+}
+
+enum SstsMergeTask {
+    ForceMergeSsts,
+    MergeSsts(usize),
+}
+
+pub struct SstsMergeOptions {
+    channel_size: usize,
+    max_interval_seconds: usize,
+    merge_threshold: usize,
+}
+
+impl Default for SstsMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 100,
+            max_interval_seconds: 5,
+            merge_threshold: 50,
+        }
+    }
+}
+
+struct SstsMerge {
+    snapshot_path: Path,
+    sst_path: Path,
+    store: ObjectStoreRef,
+    receiver: Receiver<SstsMergeTask>,
+    sst_num: usize,
+    merge_options: SstsMergeOptions,
+}
+
+impl SstsMerge {
+    async fn try_new(
+        snapshot_path: Path,
+        sst_path: Path,
+        store: ObjectStoreRef,
+        rx: Receiver<SstsMergeTask>,
+        merge_options: SstsMergeOptions,
+    ) -> Result<Self> {
+        let ssts_merge = Self {
+            snapshot_path,
+            sst_path,
+            store,
+            receiver: rx,
+            sst_num: 0,
+            merge_options,
+        };
+        // Merge all ssts when start
+        ssts_merge.merge_ssts().await?;
+
+        Ok(ssts_merge)
+    }
+
+    async fn run(&mut self) {
+        let interval = 
time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
+        let threshold = self.merge_options.merge_threshold;
+
+        loop {
+            match time::timeout(interval, self.receiver.recv()).await {
+                Ok(Some(SstsMergeTask::ForceMergeSsts)) => match 
self.merge_ssts().await {
+                    Ok(_) => {
+                        self.sst_num = 0;
+                    }
+                    Err(err) => {
+                        error!("Failed to force merge ssts, err: {:?}", err);
+                    }
+                },
+                Ok(Some(SstsMergeTask::MergeSsts(num))) => {
+                    self.sst_num += num;
+                    if self.sst_num >= threshold {
+                        match self.merge_ssts().await {
+                            Ok(_) => {
+                                self.sst_num = 0;
+                            }
+                            Err(err) => {
+                                error!("Failed to merge ssts, err: {:?}", err);
+                            }
+                        }
+                    }
+                }
+                Ok(None) => {
+                    // The channel is disconnected.
+                    info!("Channel disconnected, merge ssts task exit");
+                    break;
+                }
+                Err(_) => {
+                    info!("Timeout receive merge ssts task");
+                }
+            }
+        }
+    }
+
+    async fn merge_ssts(&self) -> Result<()> {
+        let meta_infos = self
+            .store
+            .list(Some(&self.sst_path))
+            .collect::<Vec<_>>()
+            .await;
+        if meta_infos.is_empty() {
+            return Ok(());
+        }
+
+        let mut paths = Vec::with_capacity(meta_infos.len());
+        for meta_info in meta_infos {
+            let path = meta_info
+                .context("failed to get path of manifest sst")?
+                .location;
+            paths.push(path);
+        }
+
+        let mut sst_files = Vec::with_capacity(paths.len());
+        for path in &paths {

Review Comment:
   We could fetch those files concurrently, one after one is too slow.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -140,4 +173,189 @@ impl Manifest {
             .cloned()
             .collect()
     }
+
+    async fn sender(&self, task: SstsMergeTask) {
+        if let Err(err) = self.sender.send(task).await {
+            error!("Failed to send merge ssts task, err: {:?}", err);
+        }
+    }
+}
+
+enum SstsMergeTask {
+    ForceMergeSsts,
+    MergeSsts(usize),
+}
+
+pub struct SstsMergeOptions {
+    channel_size: usize,
+    max_interval_seconds: usize,
+    merge_threshold: usize,
+}
+
+impl Default for SstsMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 100,
+            max_interval_seconds: 5,
+            merge_threshold: 50,
+        }
+    }
+}
+
+struct SstsMerge {
+    snapshot_path: Path,
+    sst_path: Path,
+    store: ObjectStoreRef,
+    receiver: Receiver<SstsMergeTask>,
+    sst_num: usize,
+    merge_options: SstsMergeOptions,
+}
+
+impl SstsMerge {
+    async fn try_new(
+        snapshot_path: Path,
+        sst_path: Path,
+        store: ObjectStoreRef,
+        rx: Receiver<SstsMergeTask>,
+        merge_options: SstsMergeOptions,
+    ) -> Result<Self> {
+        let ssts_merge = Self {
+            snapshot_path,
+            sst_path,
+            store,
+            receiver: rx,
+            sst_num: 0,
+            merge_options,
+        };
+        // Merge all ssts when start
+        ssts_merge.merge_ssts().await?;
+
+        Ok(ssts_merge)
+    }
+
+    async fn run(&mut self) {
+        let interval = 
time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
+        let threshold = self.merge_options.merge_threshold;
+
+        loop {
+            match time::timeout(interval, self.receiver.recv()).await {
+                Ok(Some(SstsMergeTask::ForceMergeSsts)) => match 
self.merge_ssts().await {
+                    Ok(_) => {
+                        self.sst_num = 0;
+                    }
+                    Err(err) => {
+                        error!("Failed to force merge ssts, err: {:?}", err);
+                    }
+                },
+                Ok(Some(SstsMergeTask::MergeSsts(num))) => {
+                    self.sst_num += num;
+                    if self.sst_num >= threshold {
+                        match self.merge_ssts().await {
+                            Ok(_) => {
+                                self.sst_num = 0;
+                            }
+                            Err(err) => {
+                                error!("Failed to merge ssts, err: {:?}", err);
+                            }
+                        }
+                    }
+                }
+                Ok(None) => {
+                    // The channel is disconnected.
+                    info!("Channel disconnected, merge ssts task exit");
+                    break;
+                }
+                Err(_) => {
+                    info!("Timeout receive merge ssts task");
+                }
+            }
+        }
+    }
+
+    async fn merge_ssts(&self) -> Result<()> {
+        let meta_infos = self
+            .store
+            .list(Some(&self.sst_path))
+            .collect::<Vec<_>>()
+            .await;
+        if meta_infos.is_empty() {
+            return Ok(());
+        }
+
+        let mut paths = Vec::with_capacity(meta_infos.len());
+        for meta_info in meta_infos {
+            let path = meta_info
+                .context("failed to get path of manifest sst")?
+                .location;
+            paths.push(path);
+        }
+
+        let mut sst_files = Vec::with_capacity(paths.len());
+        for path in &paths {
+            let bytes = self
+                .store
+                .get(path)
+                .await
+                .context("failed to read sst file")?
+                .bytes()
+                .await
+                .context("failed to read sst file")?;
+            let pb_sst = pb_types::SstFile::decode(bytes).context("failed to 
decode sst file")?;
+            sst_files.push(SstFile::try_from(pb_sst)?);
+        }
+
+        let mut payload = match self.store.get(&self.snapshot_path).await {
+            Ok(v) => {
+                let bytes = v
+                    .bytes()
+                    .await
+                    .context("failed to read manifest snapshot")?;
+                let pb_payload = pb_types::Manifest::decode(bytes)
+                    .context("failed to decode manifest snapshot")?;
+                Payload::try_from(pb_payload)?
+            }
+            Err(err) => {
+                if err.to_string().contains("not found") {
+                    Payload { files: vec![] }
+                } else {
+                    let context = format!(
+                        "Failed to get manifest snapshot, path:{}",
+                        self.snapshot_path
+                    );
+                    return Err(AnyhowError::new(err).context(context).into());
+                }
+            }
+        };
+
+        payload.files.extend(sst_files.clone());
+        let pb_manifest = pb_types::Manifest {
+            files: payload
+                .files
+                .into_iter()
+                .map(|f| f.into())
+                .collect::<Vec<_>>(),
+        };
+
+        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
+        pb_manifest
+            .encode(&mut buf)
+            .context("failed to encode manifest")?;
+        let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+
+        // 1. Persist the snapshot
+        self.store
+            .put(&self.snapshot_path, put_payload)
+            .await
+            .context("Failed to update manifest")?;
+
+        // 2. Delete the old sst files
+        for path in &paths {

Review Comment:
   Delete concurrently



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -140,4 +173,189 @@ impl Manifest {
             .cloned()
             .collect()
     }
+
+    async fn sender(&self, task: SstsMergeTask) {
+        if let Err(err) = self.sender.send(task).await {
+            error!("Failed to send merge ssts task, err: {:?}", err);
+        }
+    }
+}
+
+enum SstsMergeTask {
+    ForceMergeSsts,
+    MergeSsts(usize),
+}
+
+pub struct SstsMergeOptions {
+    channel_size: usize,
+    max_interval_seconds: usize,
+    merge_threshold: usize,
+}
+
+impl Default for SstsMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 100,
+            max_interval_seconds: 5,
+            merge_threshold: 50,
+        }
+    }
+}
+
+struct SstsMerge {
+    snapshot_path: Path,
+    sst_path: Path,
+    store: ObjectStoreRef,
+    receiver: Receiver<SstsMergeTask>,
+    sst_num: usize,
+    merge_options: SstsMergeOptions,
+}
+
+impl SstsMerge {
+    async fn try_new(
+        snapshot_path: Path,
+        sst_path: Path,
+        store: ObjectStoreRef,
+        rx: Receiver<SstsMergeTask>,
+        merge_options: SstsMergeOptions,
+    ) -> Result<Self> {
+        let ssts_merge = Self {
+            snapshot_path,
+            sst_path,
+            store,
+            receiver: rx,
+            sst_num: 0,
+            merge_options,
+        };
+        // Merge all ssts when start
+        ssts_merge.merge_ssts().await?;
+
+        Ok(ssts_merge)
+    }
+
+    async fn run(&mut self) {
+        let interval = 
time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
+        let threshold = self.merge_options.merge_threshold;
+
+        loop {
+            match time::timeout(interval, self.receiver.recv()).await {
+                Ok(Some(SstsMergeTask::ForceMergeSsts)) => match 
self.merge_ssts().await {
+                    Ok(_) => {
+                        self.sst_num = 0;
+                    }
+                    Err(err) => {
+                        error!("Failed to force merge ssts, err: {:?}", err);
+                    }
+                },
+                Ok(Some(SstsMergeTask::MergeSsts(num))) => {
+                    self.sst_num += num;
+                    if self.sst_num >= threshold {
+                        match self.merge_ssts().await {
+                            Ok(_) => {
+                                self.sst_num = 0;
+                            }
+                            Err(err) => {
+                                error!("Failed to merge ssts, err: {:?}", err);
+                            }
+                        }
+                    }
+                }
+                Ok(None) => {
+                    // The channel is disconnected.
+                    info!("Channel disconnected, merge ssts task exit");
+                    break;
+                }
+                Err(_) => {
+                    info!("Timeout receive merge ssts task");
+                }
+            }
+        }
+    }
+
+    async fn merge_ssts(&self) -> Result<()> {
+        let meta_infos = self
+            .store
+            .list(Some(&self.sst_path))
+            .collect::<Vec<_>>()
+            .await;
+        if meta_infos.is_empty() {
+            return Ok(());
+        }
+
+        let mut paths = Vec::with_capacity(meta_infos.len());
+        for meta_info in meta_infos {
+            let path = meta_info
+                .context("failed to get path of manifest sst")?
+                .location;
+            paths.push(path);
+        }
+
+        let mut sst_files = Vec::with_capacity(paths.len());
+        for path in &paths {
+            let bytes = self
+                .store
+                .get(path)
+                .await
+                .context("failed to read sst file")?
+                .bytes()
+                .await
+                .context("failed to read sst file")?;
+            let pb_sst = pb_types::SstFile::decode(bytes).context("failed to 
decode sst file")?;
+            sst_files.push(SstFile::try_from(pb_sst)?);
+        }
+
+        let mut payload = match self.store.get(&self.snapshot_path).await {
+            Ok(v) => {
+                let bytes = v
+                    .bytes()
+                    .await
+                    .context("failed to read manifest snapshot")?;
+                let pb_payload = pb_types::Manifest::decode(bytes)
+                    .context("failed to decode manifest snapshot")?;
+                Payload::try_from(pb_payload)?
+            }
+            Err(err) => {
+                if err.to_string().contains("not found") {
+                    Payload { files: vec![] }
+                } else {
+                    let context = format!(
+                        "Failed to get manifest snapshot, path:{}",
+                        self.snapshot_path
+                    );
+                    return Err(AnyhowError::new(err).context(context).into());
+                }
+            }
+        };
+
+        payload.files.extend(sst_files.clone());
+        let pb_manifest = pb_types::Manifest {
+            files: payload
+                .files
+                .into_iter()
+                .map(|f| f.into())
+                .collect::<Vec<_>>(),
+        };
+
+        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
+        pb_manifest
+            .encode(&mut buf)
+            .context("failed to encode manifest")?;
+        let put_payload = PutPayload::from_bytes(Bytes::from(buf));
+
+        // 1. Persist the snapshot
+        self.store
+            .put(&self.snapshot_path, put_payload)
+            .await
+            .context("Failed to update manifest")?;
+
+        // 2. Delete the old sst files
+        for path in &paths {
+            self.store
+                .delete(path)
+                .await
+                .context("failed to delete sst file")?;

Review Comment:
   add path name to log 



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -69,8 +82,29 @@ impl From<Payload> for pb_types::Manifest {
 }
 
 impl Manifest {
-    pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
+    pub async fn try_new(
+        path: String,
+        store: ObjectStoreRef,
+        merge_options: SstsMergeOptions,
+    ) -> Result<Self> {
         let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
+        let sst_path = Path::from(format!("{path}/{SST_PREFIX}"));
+
+        let (tx, rx) = mpsc::channel(merge_options.channel_size);
+        let mut ssts_merge = SstsMerge::try_new(
+            snapshot_path.clone(),
+            sst_path.clone(),
+            store.clone(),
+            rx,
+            merge_options,
+        )
+        .await?;
+
+        // Start merge ssts task
+        tokio::spawn(async move {
+            ssts_merge.run().await;
+        });
+

Review Comment:
   When start up, we also need to read delta files, if we only read snapshot, 
then part of sst won't be queried later?



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -140,4 +173,189 @@ impl Manifest {
             .cloned()
             .collect()
     }
+
+    async fn sender(&self, task: SstsMergeTask) {
+        if let Err(err) = self.sender.send(task).await {
+            error!("Failed to send merge ssts task, err: {:?}", err);
+        }
+    }
+}
+
+enum SstsMergeTask {
+    ForceMergeSsts,
+    MergeSsts(usize),
+}
+
+pub struct SstsMergeOptions {
+    channel_size: usize,
+    max_interval_seconds: usize,
+    merge_threshold: usize,
+}
+
+impl Default for SstsMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 100,
+            max_interval_seconds: 5,
+            merge_threshold: 50,

Review Comment:
   10-20 is a reasonable choice.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -90,43 +124,42 @@ impl Manifest {
                 }
             }
         };
+        let payload = Arc::new(RwLock::new(payload));
 
         Ok(Self {
             path,
             snapshot_path,
+            sst_path,
             store,
-            payload: RwLock::new(payload),
+            payload,
+            sender: tx,
         })
     }
 
-    // TODO: Now this functions is poorly implemented, we concat new_sst to
-    // snapshot, and upload it back in a whole.
-    // In more efficient way, we can create a new diff file, and do compaction 
in
-    // background to merge them to `snapshot`.
     pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
-        let mut payload = self.payload.write().await;
-        let mut tmp_ssts = payload.files.clone();
+        let new_sst_path = Path::from(format!("{}/{id}", self.sst_path));

Review Comment:
   If there are too many pending delta files, we should block the write here.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -29,13 +39,16 @@ use crate::{
 
 pub const PREFIX_PATH: &str = "manifest";
 pub const SNAPSHOT_FILENAME: &str = "snapshot";
+pub const SST_PREFIX: &str = "sst";

Review Comment:
   Those files should not be named sst, sst is only used in LSM, and represent 
sorted string table, here those file are delta files based on snapshot.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -140,4 +173,189 @@ impl Manifest {
             .cloned()
             .collect()
     }
+
+    async fn sender(&self, task: SstsMergeTask) {
+        if let Err(err) = self.sender.send(task).await {
+            error!("Failed to send merge ssts task, err: {:?}", err);
+        }
+    }
+}
+
+enum SstsMergeTask {
+    ForceMergeSsts,
+    MergeSsts(usize),
+}
+
+pub struct SstsMergeOptions {
+    channel_size: usize,
+    max_interval_seconds: usize,
+    merge_threshold: usize,
+}
+
+impl Default for SstsMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 100,
+            max_interval_seconds: 5,
+            merge_threshold: 50,
+        }
+    }
+}
+
+struct SstsMerge {
+    snapshot_path: Path,
+    sst_path: Path,
+    store: ObjectStoreRef,
+    receiver: Receiver<SstsMergeTask>,
+    sst_num: usize,
+    merge_options: SstsMergeOptions,
+}
+
+impl SstsMerge {
+    async fn try_new(
+        snapshot_path: Path,
+        sst_path: Path,
+        store: ObjectStoreRef,
+        rx: Receiver<SstsMergeTask>,
+        merge_options: SstsMergeOptions,
+    ) -> Result<Self> {
+        let ssts_merge = Self {
+            snapshot_path,
+            sst_path,
+            store,
+            receiver: rx,
+            sst_num: 0,
+            merge_options,
+        };
+        // Merge all ssts when start
+        ssts_merge.merge_ssts().await?;
+
+        Ok(ssts_merge)
+    }
+
+    async fn run(&mut self) {
+        let interval = 
time::Duration::from_secs(self.merge_options.max_interval_seconds as u64);
+        let threshold = self.merge_options.merge_threshold;
+
+        loop {
+            match time::timeout(interval, self.receiver.recv()).await {
+                Ok(Some(SstsMergeTask::ForceMergeSsts)) => match 
self.merge_ssts().await {
+                    Ok(_) => {
+                        self.sst_num = 0;
+                    }
+                    Err(err) => {
+                        error!("Failed to force merge ssts, err: {:?}", err);
+                    }
+                },
+                Ok(Some(SstsMergeTask::MergeSsts(num))) => {
+                    self.sst_num += num;
+                    if self.sst_num >= threshold {
+                        match self.merge_ssts().await {
+                            Ok(_) => {
+                                self.sst_num = 0;
+                            }
+                            Err(err) => {
+                                error!("Failed to merge ssts, err: {:?}", err);
+                            }
+                        }
+                    }
+                }
+                Ok(None) => {
+                    // The channel is disconnected.
+                    info!("Channel disconnected, merge ssts task exit");
+                    break;
+                }
+                Err(_) => {
+                    info!("Timeout receive merge ssts task");
+                }
+            }
+        }
+    }
+
+    async fn merge_ssts(&self) -> Result<()> {
+        let meta_infos = self
+            .store
+            .list(Some(&self.sst_path))
+            .collect::<Vec<_>>()
+            .await;
+        if meta_infos.is_empty() {
+            return Ok(());
+        }
+
+        let mut paths = Vec::with_capacity(meta_infos.len());
+        for meta_info in meta_infos {
+            let path = meta_info
+                .context("failed to get path of manifest sst")?
+                .location;
+            paths.push(path);
+        }
+
+        let mut sst_files = Vec::with_capacity(paths.len());
+        for path in &paths {
+            let bytes = self
+                .store
+                .get(path)
+                .await
+                .context("failed to read sst file")?
+                .bytes()
+                .await
+                .context("failed to read sst file")?;
+            let pb_sst = pb_types::SstFile::decode(bytes).context("failed to 
decode sst file")?;
+            sst_files.push(SstFile::try_from(pb_sst)?);

Review Comment:
   We need to dedup files before merge, since there may exists old delta files, 
which are failed to delete in last merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to