jiacai2050 commented on code in PR #1596:
URL: https://github.com/apache/horaedb/pull/1596#discussion_r1852019143


##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -94,39 +141,40 @@ impl Manifest {
         Ok(Self {
             path,
             snapshot_path,
+            delta_manifest_path,
             store,
+            manifest_merge,
             payload: RwLock::new(payload),
         })
     }
 
-    // TODO: Now this functions is poorly implemented, we concat new_sst to
-    // snapshot, and upload it back in a whole.
-    // In more efficient way, we can create a new diff file, and do compaction 
in
-    // background to merge them to `snapshot`.
     pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
-        let mut payload = self.payload.write().await;
-        let mut tmp_ssts = payload.files.clone();
+        // Schedule force merge manifest
+        self.manifest_merge.schedule_force_merge().await?;
+
+        let new_sst_path = Path::from(format!("{}/{id}", 
self.delta_manifest_path));
         let new_sst = SstFile { id, meta };
-        tmp_ssts.push(new_sst.clone());
-        let pb_manifest = pb_types::Manifest {
-            files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
-        };
 
-        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
-        pb_manifest
+        let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
+        let mut buf: Vec<u8> = 
Vec::with_capacity(new_sst_payload.encoded_len());
+        new_sst_payload
             .encode(&mut buf)
-            .context("failed to encode manifest")?;
+            .context("failed to encode manifest file")?;
         let put_payload = PutPayload::from_bytes(Bytes::from(buf));
 
-        // 1. Persist the snapshot
+        // 1. Persist the manifest file
         self.store
-            .put(&self.snapshot_path, put_payload)
+            .put(&new_sst_path, put_payload)
             .await
-            .context("Failed to update manifest")?;
+            .context("Failed to update manifest file")?;
 
         // 2. Update cached payload
+        let mut payload = self.payload.write().await;

Review Comment:
   Wrap step2 in `{ .. }`, so we can release write lock quickly.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -141,3 +189,241 @@ impl Manifest {
             .collect()
     }
 }
+
+enum ManifestMergeTask {
+    ForceMergeManifest,
+    MergeManifest,
+}
+
+#[derive(Clone)]
+pub struct ManifestMergeOptions {
+    channel_size: usize,
+    merge_interval_seconds: usize,
+    merge_threshold: usize,
+    force_merge_threshold: usize,
+}
+
+impl Default for ManifestMergeOptions {
+    fn default() -> Self {
+        Self {
+            channel_size: 100,
+            merge_interval_seconds: 5,
+            merge_threshold: 20,
+            force_merge_threshold: 50,
+        }
+    }
+}
+
+struct ManifestMerge {
+    snapshot_path: Path,
+    delta_manifest_path: Path,
+    store: ObjectStoreRef,
+    sender: Sender<ManifestMergeTask>,
+    receiver: RwLock<Receiver<ManifestMergeTask>>,
+    sst_num: AtomicUsize,
+    merge_options: ManifestMergeOptions,
+}
+
+impl ManifestMerge {
+    async fn try_new(
+        snapshot_path: Path,
+        delta_manifest_path: Path,
+        store: ObjectStoreRef,
+        merge_options: ManifestMergeOptions,
+    ) -> Result<Arc<Self>> {
+        let (tx, rx) = mpsc::channel(merge_options.channel_size);
+        let manifest_merge = Self {
+            snapshot_path,
+            delta_manifest_path,
+            store,
+            sender: tx,
+            receiver: RwLock::new(rx),
+            sst_num: AtomicUsize::new(0),
+            merge_options,
+        };
+        // Merge all manifest files when start
+        manifest_merge.merge_manifest().await?;
+
+        Ok(Arc::new(manifest_merge))
+    }
+
+    async fn run(&self) {
+        let merge_interval = 
Duration::from_secs(self.merge_options.merge_interval_seconds as u64);
+        let mut receiver = self.receiver.write().await;
+        loop {
+            match timeout(merge_interval, receiver.recv()).await {
+                Ok(Some(ManifestMergeTask::ForceMergeManifest)) => {
+                    if self.sst_num.load(Ordering::Relaxed)
+                        < self.merge_options.force_merge_threshold
+                    {
+                        continue;
+                    }
+                    match self.merge_manifest().await {
+                        Ok(_) => {
+                            self.sst_num.store(0, Ordering::Relaxed);

Review Comment:
   Between L261 and L263, a new delta file may be created, so we can reset this 
to 0 directly.
   
   `merge_manifest` could return num of deltas merged, so we can subtract it 
here.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -29,11 +46,14 @@ use crate::{
 
 pub const PREFIX_PATH: &str = "manifest";
 pub const SNAPSHOT_FILENAME: &str = "snapshot";
+pub const DELTA_MANIFEST_PREFIX: &str = "delta_manifest";

Review Comment:
   ```suggestion
   pub const DELTA_MANIFEST_PREFIX: &str = "delta";
   ```



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -94,39 +141,40 @@ impl Manifest {
         Ok(Self {
             path,
             snapshot_path,
+            delta_manifest_path,
             store,
+            manifest_merge,
             payload: RwLock::new(payload),
         })
     }
 
-    // TODO: Now this functions is poorly implemented, we concat new_sst to
-    // snapshot, and upload it back in a whole.
-    // In more efficient way, we can create a new diff file, and do compaction 
in
-    // background to merge them to `snapshot`.
     pub async fn add_file(&self, id: FileId, meta: FileMeta) -> Result<()> {
-        let mut payload = self.payload.write().await;
-        let mut tmp_ssts = payload.files.clone();
+        // Schedule force merge manifest
+        self.manifest_merge.schedule_force_merge().await?;
+
+        let new_sst_path = Path::from(format!("{}/{id}", 
self.delta_manifest_path));
         let new_sst = SstFile { id, meta };
-        tmp_ssts.push(new_sst.clone());
-        let pb_manifest = pb_types::Manifest {
-            files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
-        };
 
-        let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
-        pb_manifest
+        let new_sst_payload = pb_types::SstFile::from(new_sst.clone());
+        let mut buf: Vec<u8> = 
Vec::with_capacity(new_sst_payload.encoded_len());
+        new_sst_payload
             .encode(&mut buf)
-            .context("failed to encode manifest")?;
+            .context("failed to encode manifest file")?;
         let put_payload = PutPayload::from_bytes(Bytes::from(buf));
 
-        // 1. Persist the snapshot
+        // 1. Persist the manifest file
         self.store
-            .put(&self.snapshot_path, put_payload)
+            .put(&new_sst_path, put_payload)
             .await
-            .context("Failed to update manifest")?;
+            .context("Failed to update manifest file")?;
 
         // 2. Update cached payload
+        let mut payload = self.payload.write().await;
         payload.files.push(new_sst);
 
+        // 3. Schedule manifest merge
+        self.manifest_merge.schedule_merge().await;

Review Comment:
   Why call this again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to