jiacai2050 commented on code in PR #1599:
URL: https://github.com/apache/horaedb/pull/1599#discussion_r1855706589


##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -396,11 +381,118 @@ async fn read_delta_file(store: ObjectStoreRef, 
sst_path: Path) -> Result<SstFil
     Ok(sst)
 }
 
-async fn delete_delta_file(store: ObjectStoreRef, path: Path) -> Result<()> {
+async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> {
     store
-        .delete(&path)
+        .delete(path)
         .await
         .with_context(|| format!("Failed to delete delta files, 
path:{path}"))?;
 
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use std::{sync::Arc, thread::sleep};
+
+    use object_store::local::LocalFileSystem;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_manifest() {
+        let root_dir = temp_dir::TempDir::new().unwrap();
+        let runtime = tokio::runtime::Runtime::new().unwrap();
+        let store = Arc::new(LocalFileSystem::new());
+
+        let manifest = Manifest::try_new(
+            root_dir.path().to_string_lossy().to_string(),
+            store,
+            Arc::new(runtime),
+            ManifestMergeOptions::default(),
+        )
+        .await
+        .unwrap();
+
+        for i in 0..20 {
+            let time_range = TimeRange::new(i.into(), (i + 1).into());

Review Comment:
   ```suggestion
               let time_range = (i, i+1).into();
   ```



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -396,11 +381,118 @@ async fn read_delta_file(store: ObjectStoreRef, 
sst_path: Path) -> Result<SstFil
     Ok(sst)
 }
 
-async fn delete_delta_file(store: ObjectStoreRef, path: Path) -> Result<()> {
+async fn delete_delta_file(store: &ObjectStoreRef, path: &Path) -> Result<()> {
     store
-        .delete(&path)
+        .delete(path)
         .await
         .with_context(|| format!("Failed to delete delta files, 
path:{path}"))?;
 
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use std::{sync::Arc, thread::sleep};
+
+    use object_store::local::LocalFileSystem;
+
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_manifest() {
+        let root_dir = temp_dir::TempDir::new().unwrap();
+        let runtime = tokio::runtime::Runtime::new().unwrap();
+        let store = Arc::new(LocalFileSystem::new());
+
+        let manifest = Manifest::try_new(
+            root_dir.path().to_string_lossy().to_string(),
+            store,
+            Arc::new(runtime),
+            ManifestMergeOptions::default(),
+        )
+        .await
+        .unwrap();
+
+        for i in 0..20 {
+            let time_range = TimeRange::new(i.into(), (i + 1).into());
+            let meta = FileMeta {
+                max_sequence: i as u64,
+                num_rows: i as u32,
+                size: i as u32,
+                time_range,
+            };
+            manifest.add_file(i as u64, meta).await.unwrap();
+        }
+
+        let find_range = TimeRange::new(10.into(), 15.into());
+        let mut ssts = manifest.find_ssts(&find_range).await;
+
+        let mut expected_ssts = (10..15)
+            .map(|i| {
+                let id = i as u64;
+                let time_range = TimeRange::new(i.into(), (i + 1).into());
+                let meta = FileMeta {
+                    max_sequence: i as u64,
+                    num_rows: i as u32,
+                    size: i as u32,
+                    time_range,
+                };
+                SstFile { id, meta }
+            })
+            .collect::<Vec<_>>();
+
+        expected_ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        ssts.sort_by(|a, b| a.id.cmp(&b.id));
+        assert_eq!(expected_ssts, ssts);
+    }
+
+    #[tokio::test]
+    async fn test_merge_manifest() {
+        let root_dir = temp_dir::TempDir::new()
+            .unwrap()
+            .path()
+            .to_string_lossy()
+            .to_string();
+        let snapshot_path = 
Path::from(format!("{root_dir}/{SNAPSHOT_FILENAME}"));
+        let runtime = tokio::runtime::Builder::new_multi_thread()
+            .worker_threads(4)
+            .enable_all()
+            .build()
+            .unwrap();
+        let store: ObjectStoreRef = Arc::new(LocalFileSystem::new());
+
+        let manifest = Manifest::try_new(
+            root_dir,
+            store.clone(),
+            Arc::new(runtime),
+            ManifestMergeOptions {
+                merge_interval_seconds: 1,
+                ..Default::default()
+            },
+        )
+        .await
+        .unwrap();
+
+        // Add manifest files
+        for i in 0..20 {
+            let time_range = TimeRange::new(i.into(), (i + 1).into());
+            let meta = FileMeta {
+                max_sequence: i as u64,
+                num_rows: i as u32,
+                size: i as u32,
+                time_range,
+            };
+            manifest.add_file(i as u64, meta).await.unwrap();
+        }
+
+        // Wait for merge manifest to finish
+        sleep(Duration::from_secs(1));

Review Comment:
   After this, you should ensure no delta files exists any more.



##########
horaedb/metric_engine/src/manifest.rs:
##########
@@ -285,25 +273,24 @@ impl ManifestMerger {
             .map(|value| {
                 value
                     .map(|v| v.location)
-                    .context("failed to get delta file path")
+                    .with_context(|| format!("Failed to list delta files, 
path:{}", self.delta_dir))
             })
             .try_collect::<Vec<_>>()
             .await?;
         if paths.is_empty() {
             return Ok(());
         }
 
-        let mut stream_read = FuturesUnordered::new();
-        for path in paths.clone() {
-            let store = self.store.clone();
-            // TODO: use thread pool to read manifest files
-            let handle = tokio::spawn(async move { read_delta_file(store, 
path).await });
-            stream_read.push(handle);
-        }
-        let mut delta_files = Vec::with_capacity(stream_read.len());
-        while let Some(res) = stream_read.next().await {
-            let res = res.context("Failed to join read delta task")??;
-            delta_files.push(res);
+        let (_, results) = TokioScope::scope_and_block(|scope| {
+            for path in &paths {
+                scope.spawn(async move { read_delta_file(&self.store, 
path).await });

Review Comment:
   `move` is not required now.



##########
horaedb/metric_engine/src/sst.rs:
##########
@@ -31,7 +31,7 @@ pub const PREFIX_PATH: &str = "data";
 
 pub type FileId = u64;
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq)]

Review Comment:
   `PartialEq`, `Eq` are usually implemented at the same if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to