This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c672d1  fix: build up incremental file groups (#273)
1c672d1 is described below

commit 1c672d14a5db7318811c8851b15ce0640f1e381f
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 29 18:32:24 2025 -0600

    fix: build up incremental file groups (#273)
    
    When incrementally fetch changed file groups from commit metadata, the same 
file groups should be merged such that all relevant log files are included.
---
 crates/core/src/file_group/base_file.rs    |  15 +++
 crates/core/src/file_group/builder.rs      |  23 +++++
 crates/core/src/file_group/file_slice.rs   | 142 +++++++++++++++++++++++++++++
 crates/core/src/file_group/log_file/mod.rs |  14 +++
 crates/core/src/file_group/mod.rs          |  19 ++++
 crates/core/src/table/mod.rs               |  29 +++++-
 crates/core/src/timeline/mod.rs            |   4 +-
 7 files changed, 242 insertions(+), 4 deletions(-)

diff --git a/crates/core/src/file_group/base_file.rs 
b/crates/core/src/file_group/base_file.rs
index c444234..4f35b87 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -19,6 +19,7 @@
 use crate::error::CoreError;
 use crate::storage::file_metadata::FileMetadata;
 use crate::Result;
+use std::fmt::Display;
 use std::str::FromStr;
 
 /// Hudi Base file, part of a [FileSlice].
@@ -88,6 +89,20 @@ impl BaseFile {
     }
 }
 
+impl Display for BaseFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "BaseFile: {}", self.file_name())
+    }
+}
+
+impl PartialEq for BaseFile {
+    fn eq(&self, other: &Self) -> bool {
+        self.file_name() == other.file_name()
+    }
+}
+
+impl Eq for BaseFile {}
+
 impl FromStr for BaseFile {
     type Err = CoreError;
 
diff --git a/crates/core/src/file_group/builder.rs 
b/crates/core/src/file_group/builder.rs
index 035c681..9f2d0c3 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -23,6 +23,29 @@ use serde_json::{Map, Value};
 use std::collections::HashSet;
 use std::path::Path;
 
+pub trait FileGroupMerger {
+    fn merge<I>(&mut self, file_groups: I) -> Result<()>
+    where
+        I: IntoIterator<Item = FileGroup>;
+}
+
+impl FileGroupMerger for HashSet<FileGroup> {
+    fn merge<I>(&mut self, file_groups: I) -> Result<()>
+    where
+        I: IntoIterator<Item = FileGroup>,
+    {
+        for file_group in file_groups {
+            if let Some(mut existing) = self.take(&file_group) {
+                existing.merge(&file_group)?;
+                self.insert(existing);
+            } else {
+                self.insert(file_group);
+            }
+        }
+        Ok(())
+    }
+}
+
 pub fn build_file_groups(commit_metadata: &Map<String, Value>) -> 
Result<HashSet<FileGroup>> {
     let partition_stats = commit_metadata
         .get("partitionToWriteStats")
diff --git a/crates/core/src/file_group/file_slice.rs 
b/crates/core/src/file_group/file_slice.rs
index 6ba2d0d..7218188 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -22,6 +22,7 @@ use crate::file_group::log_file::LogFile;
 use crate::storage::Storage;
 use crate::Result;
 use std::collections::BTreeSet;
+use std::fmt::Display;
 use std::path::PathBuf;
 
 /// Within a [crate::file_group::FileGroup],
@@ -33,6 +34,24 @@ pub struct FileSlice {
     pub partition_path: String,
 }
 
+impl Display for FileSlice {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "FileSlice {{ base_file: {}, log_files: {:?}, partition_path: {} 
}}",
+            self.base_file, self.log_files, self.partition_path
+        )
+    }
+}
+
+impl PartialEq for FileSlice {
+    fn eq(&self, other: &Self) -> bool {
+        self.base_file == other.base_file && self.partition_path == 
other.partition_path
+    }
+}
+
+impl Eq for FileSlice {}
+
 impl FileSlice {
     pub fn new(base_file: BaseFile, partition_path: String) -> Self {
         Self {
@@ -47,6 +66,17 @@ impl FileSlice {
         !self.log_files.is_empty()
     }
 
+    pub fn merge(&mut self, other: &FileSlice) -> Result<()> {
+        if self != other {
+            return Err(CoreError::FileGroup(format!(
+                "Cannot merge different file slices: {self} and {other}"
+            )));
+        }
+        self.log_files.extend(other.log_files.iter().cloned());
+
+        Ok(())
+    }
+
     fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
         let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
         path.to_str().map(|s| s.to_string()).ok_or_else(|| {
@@ -94,3 +124,115 @@ impl FileSlice {
         Ok(())
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::table::partition::EMPTY_PARTITION_PATH;
+    use std::str::FromStr;
+
+    #[test]
+    fn test_file_slices_merge() -> Result<()> {
+        let base = BaseFile::from_str(
+            
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_0-7-24_20250109233025121.parquet",
+        )?;
+        let mut log_set1 = BTreeSet::new();
+        log_set1.insert(LogFile::from_str(
+            
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.4_0-51-115",
+        )?);
+        log_set1.insert(LogFile::from_str(
+            
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
+        )?);
+
+        let mut log_set2 = BTreeSet::new();
+        log_set2.insert(LogFile::from_str(
+            
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.3_0-51-115",
+        )?);
+        log_set2.insert(LogFile::from_str(
+            
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115",
+        )?);
+        log_set1.insert(LogFile::from_str(
+            
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
+        )?);
+
+        let mut slice1 = FileSlice {
+            base_file: base.clone(),
+            log_files: log_set1,
+            partition_path: EMPTY_PARTITION_PATH.to_string(),
+        };
+
+        let slice2 = FileSlice {
+            base_file: base,
+            log_files: log_set2,
+            partition_path: EMPTY_PARTITION_PATH.to_string(),
+        };
+
+        slice1.merge(&slice2)?;
+
+        // Verify merged result
+        assert_eq!(slice1.log_files.len(), 4);
+        let log_file_names = slice1
+            .log_files
+            .iter()
+            .map(|log| log.file_name())
+            .collect::<Vec<String>>();
+        assert_eq!(
+            log_file_names.as_slice(),
+            &[
+                
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115",
+                
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
+                
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.3_0-51-115",
+                
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.4_0-51-115",
+            ]
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_merge_different_base_files() -> Result<()> {
+        let mut slice1 = FileSlice {
+            base_file: BaseFile::from_str(
+                
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_0-7-24_20250109233025121.parquet",
+            )?,
+            log_files: BTreeSet::new(),
+            partition_path: EMPTY_PARTITION_PATH.to_string(),
+        };
+
+        let slice2 = FileSlice {
+            base_file: BaseFile::from_str(
+                
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_1-19-51_20250109233025121.parquet",
+            )?,
+            log_files: BTreeSet::new(),
+            partition_path: EMPTY_PARTITION_PATH.to_string(),
+        };
+
+        // Should return error for different base files
+        assert!(slice1.merge(&slice2).is_err());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_merge_different_partition_paths() -> Result<()> {
+        let base = BaseFile::from_str(
+            
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_1-19-51_20250109233025121.parquet",
+        )?;
+        let mut slice1 = FileSlice {
+            base_file: base.clone(),
+            log_files: BTreeSet::new(),
+            partition_path: "path/to/partition1".to_string(),
+        };
+
+        let slice2 = FileSlice {
+            base_file: base,
+            log_files: BTreeSet::new(),
+            partition_path: "path/to/partition2".to_string(),
+        };
+
+        // Should return error for different partition paths
+        assert!(slice1.merge(&slice2).is_err());
+
+        Ok(())
+    }
+}
diff --git a/crates/core/src/file_group/log_file/mod.rs 
b/crates/core/src/file_group/log_file/mod.rs
index d473bc2..04e9577 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -20,6 +20,7 @@ use crate::error::CoreError;
 use crate::storage::file_metadata::FileMetadata;
 use crate::Result;
 use std::cmp::Ordering;
+use std::fmt::Display;
 use std::str::FromStr;
 
 mod log_block;
@@ -142,6 +143,12 @@ impl TryFrom<FileMetadata> for LogFile {
     }
 }
 
+impl Display for LogFile {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "LogFile: {}", self.file_name())
+    }
+}
+
 impl PartialEq for LogFile {
     fn eq(&self, other: &Self) -> bool {
         self.file_name() == other.file_name()
@@ -171,6 +178,13 @@ impl Ord for LogFile {
 mod tests {
     use super::*;
 
+    #[test]
+    fn test_log_file_name_in_formatted_str() {
+        let filename = 
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
+        let log_file = LogFile::from_str(filename).unwrap();
+        assert!(format!("{}", log_file).contains(filename));
+    }
+
     #[test]
     fn test_valid_filename_parsing() {
         let filename = 
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 62cc1e1..cff9a78 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -97,6 +97,25 @@ impl FileGroup {
         Ok(file_group)
     }
 
+    pub fn merge(&mut self, other: &FileGroup) -> Result<()> {
+        if self != other {
+            return Err(CoreError::FileGroup(format!(
+                "Cannot merge different file groups: {self} and {other}",
+            )));
+        }
+
+        for (commit_timestamp, other_file_slice) in other.file_slices.iter() {
+            if let Some(existing_file_slice) = 
self.file_slices.get_mut(commit_timestamp) {
+                existing_file_slice.merge(other_file_slice)?;
+            } else {
+                self.file_slices
+                    .insert(commit_timestamp.clone(), 
other_file_slice.clone());
+            }
+        }
+
+        Ok(())
+    }
+
     /// Add a [BaseFile] based on the file name to the corresponding 
[FileSlice] in the [FileGroup].
     pub fn add_base_file_from_name(&mut self, file_name: &str) -> 
Result<&Self> {
         let base_file = BaseFile::from_str(file_name)?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 511f261..82f6243 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -978,12 +978,37 @@ mod tests {
     async fn hudi_table_get_file_slices_between_timestamps() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let file_slices = hudi_table
+        let mut file_slices = hudi_table
             .get_file_slices_between(None, Some("20250121000656060"))
             .await
             .unwrap();
         assert_eq!(file_slices.len(), 3);
-        // TODO: Add more assertions
+
+        file_slices.sort_unstable_by_key(|f| f.partition_path.clone());
+
+        let file_slice_0 = &file_slices[0];
+        assert_eq!(file_slice_0.partition_path, "10");
+        assert_eq!(
+            file_slice_0.file_id(),
+            "92e64357-e4d1-4639-a9d3-c3535829d0aa-0"
+        );
+        assert_eq!(file_slice_0.log_files.len(), 1);
+
+        let file_slice_1 = &file_slices[1];
+        assert_eq!(file_slice_1.partition_path, "20");
+        assert_eq!(
+            file_slice_1.file_id(),
+            "d49ae379-4f20-4549-8e23-a5f9604412c0-0"
+        );
+        assert!(file_slice_1.log_files.is_empty());
+
+        let file_slice_2 = &file_slices[2];
+        assert_eq!(file_slice_2.partition_path, "30");
+        assert_eq!(
+            file_slice_2.file_id(),
+            "de3550df-e12c-4591-9335-92ff992258a2-0"
+        );
+        assert!(file_slice_2.log_files.is_empty());
     }
 
     #[tokio::test]
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 9722393..67dad41 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -21,7 +21,7 @@ pub(crate) mod selector;
 
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
-use crate::file_group::builder::{build_file_groups, 
build_replaced_file_groups};
+use crate::file_group::builder::{build_file_groups, 
build_replaced_file_groups, FileGroupMerger};
 use crate::file_group::FileGroup;
 use crate::storage::Storage;
 use crate::timeline::selector::TimelineSelector;
@@ -220,7 +220,7 @@ impl Timeline {
         let commits = selector.select(self)?;
         for commit in commits {
             let commit_metadata = self.get_commit_metadata(&commit).await?;
-            file_groups.extend(build_file_groups(&commit_metadata)?);
+            file_groups.merge(build_file_groups(&commit_metadata)?)?;
 
             if commit.is_replacecommit() {
                 
replaced_file_groups.extend(build_replaced_file_groups(&commit_metadata)?);

Reply via email to