This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1c672d1 fix: build up incremental file groups (#273)
1c672d1 is described below
commit 1c672d14a5db7318811c8851b15ce0640f1e381f
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 29 18:32:24 2025 -0600
fix: build up incremental file groups (#273)
When incrementally fetch changed file groups from commit metadata, the same
file groups should be merged such that all relevant log files are included.
---
crates/core/src/file_group/base_file.rs | 15 +++
crates/core/src/file_group/builder.rs | 23 +++++
crates/core/src/file_group/file_slice.rs | 142 +++++++++++++++++++++++++++++
crates/core/src/file_group/log_file/mod.rs | 14 +++
crates/core/src/file_group/mod.rs | 19 ++++
crates/core/src/table/mod.rs | 29 +++++-
crates/core/src/timeline/mod.rs | 4 +-
7 files changed, 242 insertions(+), 4 deletions(-)
diff --git a/crates/core/src/file_group/base_file.rs
b/crates/core/src/file_group/base_file.rs
index c444234..4f35b87 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -19,6 +19,7 @@
use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
use crate::Result;
+use std::fmt::Display;
use std::str::FromStr;
/// Hudi Base file, part of a [FileSlice].
@@ -88,6 +89,20 @@ impl BaseFile {
}
}
+impl Display for BaseFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "BaseFile: {}", self.file_name())
+ }
+}
+
+impl PartialEq for BaseFile {
+ fn eq(&self, other: &Self) -> bool {
+ self.file_name() == other.file_name()
+ }
+}
+
+impl Eq for BaseFile {}
+
impl FromStr for BaseFile {
type Err = CoreError;
diff --git a/crates/core/src/file_group/builder.rs
b/crates/core/src/file_group/builder.rs
index 035c681..9f2d0c3 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -23,6 +23,29 @@ use serde_json::{Map, Value};
use std::collections::HashSet;
use std::path::Path;
+pub trait FileGroupMerger {
+ fn merge<I>(&mut self, file_groups: I) -> Result<()>
+ where
+ I: IntoIterator<Item = FileGroup>;
+}
+
+impl FileGroupMerger for HashSet<FileGroup> {
+ fn merge<I>(&mut self, file_groups: I) -> Result<()>
+ where
+ I: IntoIterator<Item = FileGroup>,
+ {
+ for file_group in file_groups {
+ if let Some(mut existing) = self.take(&file_group) {
+ existing.merge(&file_group)?;
+ self.insert(existing);
+ } else {
+ self.insert(file_group);
+ }
+ }
+ Ok(())
+ }
+}
+
pub fn build_file_groups(commit_metadata: &Map<String, Value>) ->
Result<HashSet<FileGroup>> {
let partition_stats = commit_metadata
.get("partitionToWriteStats")
diff --git a/crates/core/src/file_group/file_slice.rs
b/crates/core/src/file_group/file_slice.rs
index 6ba2d0d..7218188 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -22,6 +22,7 @@ use crate::file_group::log_file::LogFile;
use crate::storage::Storage;
use crate::Result;
use std::collections::BTreeSet;
+use std::fmt::Display;
use std::path::PathBuf;
/// Within a [crate::file_group::FileGroup],
@@ -33,6 +34,24 @@ pub struct FileSlice {
pub partition_path: String,
}
+impl Display for FileSlice {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "FileSlice {{ base_file: {}, log_files: {:?}, partition_path: {}
}}",
+ self.base_file, self.log_files, self.partition_path
+ )
+ }
+}
+
+impl PartialEq for FileSlice {
+ fn eq(&self, other: &Self) -> bool {
+ self.base_file == other.base_file && self.partition_path ==
other.partition_path
+ }
+}
+
+impl Eq for FileSlice {}
+
impl FileSlice {
pub fn new(base_file: BaseFile, partition_path: String) -> Self {
Self {
@@ -47,6 +66,17 @@ impl FileSlice {
!self.log_files.is_empty()
}
+ pub fn merge(&mut self, other: &FileSlice) -> Result<()> {
+ if self != other {
+ return Err(CoreError::FileGroup(format!(
+ "Cannot merge different file slices: {self} and {other}"
+ )));
+ }
+ self.log_files.extend(other.log_files.iter().cloned());
+
+ Ok(())
+ }
+
fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
@@ -94,3 +124,115 @@ impl FileSlice {
Ok(())
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::table::partition::EMPTY_PARTITION_PATH;
+ use std::str::FromStr;
+
+ #[test]
+ fn test_file_slices_merge() -> Result<()> {
+ let base = BaseFile::from_str(
+
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_0-7-24_20250109233025121.parquet",
+ )?;
+ let mut log_set1 = BTreeSet::new();
+ log_set1.insert(LogFile::from_str(
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.4_0-51-115",
+ )?);
+ log_set1.insert(LogFile::from_str(
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
+ )?);
+
+ let mut log_set2 = BTreeSet::new();
+ log_set2.insert(LogFile::from_str(
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.3_0-51-115",
+ )?);
+ log_set2.insert(LogFile::from_str(
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115",
+ )?);
+ log_set1.insert(LogFile::from_str(
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
+ )?);
+
+ let mut slice1 = FileSlice {
+ base_file: base.clone(),
+ log_files: log_set1,
+ partition_path: EMPTY_PARTITION_PATH.to_string(),
+ };
+
+ let slice2 = FileSlice {
+ base_file: base,
+ log_files: log_set2,
+ partition_path: EMPTY_PARTITION_PATH.to_string(),
+ };
+
+ slice1.merge(&slice2)?;
+
+ // Verify merged result
+ assert_eq!(slice1.log_files.len(), 4);
+ let log_file_names = slice1
+ .log_files
+ .iter()
+ .map(|log| log.file_name())
+ .collect::<Vec<String>>();
+ assert_eq!(
+ log_file_names.as_slice(),
+ &[
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115",
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.2_0-51-115",
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.3_0-51-115",
+
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.4_0-51-115",
+ ]
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_different_base_files() -> Result<()> {
+ let mut slice1 = FileSlice {
+ base_file: BaseFile::from_str(
+
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_0-7-24_20250109233025121.parquet",
+ )?,
+ log_files: BTreeSet::new(),
+ partition_path: EMPTY_PARTITION_PATH.to_string(),
+ };
+
+ let slice2 = FileSlice {
+ base_file: BaseFile::from_str(
+
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_1-19-51_20250109233025121.parquet",
+ )?,
+ log_files: BTreeSet::new(),
+ partition_path: EMPTY_PARTITION_PATH.to_string(),
+ };
+
+ // Should return error for different base files
+ assert!(slice1.merge(&slice2).is_err());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_merge_different_partition_paths() -> Result<()> {
+ let base = BaseFile::from_str(
+
"54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_1-19-51_20250109233025121.parquet",
+ )?;
+ let mut slice1 = FileSlice {
+ base_file: base.clone(),
+ log_files: BTreeSet::new(),
+ partition_path: "path/to/partition1".to_string(),
+ };
+
+ let slice2 = FileSlice {
+ base_file: base,
+ log_files: BTreeSet::new(),
+ partition_path: "path/to/partition2".to_string(),
+ };
+
+ // Should return error for different partition paths
+ assert!(slice1.merge(&slice2).is_err());
+
+ Ok(())
+ }
+}
diff --git a/crates/core/src/file_group/log_file/mod.rs
b/crates/core/src/file_group/log_file/mod.rs
index d473bc2..04e9577 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -20,6 +20,7 @@ use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
use crate::Result;
use std::cmp::Ordering;
+use std::fmt::Display;
use std::str::FromStr;
mod log_block;
@@ -142,6 +143,12 @@ impl TryFrom<FileMetadata> for LogFile {
}
}
+impl Display for LogFile {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "LogFile: {}", self.file_name())
+ }
+}
+
impl PartialEq for LogFile {
fn eq(&self, other: &Self) -> bool {
self.file_name() == other.file_name()
@@ -171,6 +178,13 @@ impl Ord for LogFile {
mod tests {
use super::*;
+ #[test]
+ fn test_log_file_name_in_formatted_str() {
+ let filename =
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
+ let log_file = LogFile::from_str(filename).unwrap();
+ assert!(format!("{}", log_file).contains(filename));
+ }
+
#[test]
fn test_valid_filename_parsing() {
let filename =
".54e9a5e9-ee5d-4ed2-acee-720b5810d380-0_20250109233025121.log.1_0-51-115";
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 62cc1e1..cff9a78 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -97,6 +97,25 @@ impl FileGroup {
Ok(file_group)
}
+ pub fn merge(&mut self, other: &FileGroup) -> Result<()> {
+ if self != other {
+ return Err(CoreError::FileGroup(format!(
+ "Cannot merge different file groups: {self} and {other}",
+ )));
+ }
+
+ for (commit_timestamp, other_file_slice) in other.file_slices.iter() {
+ if let Some(existing_file_slice) =
self.file_slices.get_mut(commit_timestamp) {
+ existing_file_slice.merge(other_file_slice)?;
+ } else {
+ self.file_slices
+ .insert(commit_timestamp.clone(),
other_file_slice.clone());
+ }
+ }
+
+ Ok(())
+ }
+
/// Add a [BaseFile] based on the file name to the corresponding
[FileSlice] in the [FileGroup].
pub fn add_base_file_from_name(&mut self, file_name: &str) ->
Result<&Self> {
let base_file = BaseFile::from_str(file_name)?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 511f261..82f6243 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -978,12 +978,37 @@ mod tests {
async fn hudi_table_get_file_slices_between_timestamps() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let file_slices = hudi_table
+ let mut file_slices = hudi_table
.get_file_slices_between(None, Some("20250121000656060"))
.await
.unwrap();
assert_eq!(file_slices.len(), 3);
- // TODO: Add more assertions
+
+ file_slices.sort_unstable_by_key(|f| f.partition_path.clone());
+
+ let file_slice_0 = &file_slices[0];
+ assert_eq!(file_slice_0.partition_path, "10");
+ assert_eq!(
+ file_slice_0.file_id(),
+ "92e64357-e4d1-4639-a9d3-c3535829d0aa-0"
+ );
+ assert_eq!(file_slice_0.log_files.len(), 1);
+
+ let file_slice_1 = &file_slices[1];
+ assert_eq!(file_slice_1.partition_path, "20");
+ assert_eq!(
+ file_slice_1.file_id(),
+ "d49ae379-4f20-4549-8e23-a5f9604412c0-0"
+ );
+ assert!(file_slice_1.log_files.is_empty());
+
+ let file_slice_2 = &file_slices[2];
+ assert_eq!(file_slice_2.partition_path, "30");
+ assert_eq!(
+ file_slice_2.file_id(),
+ "de3550df-e12c-4591-9335-92ff992258a2-0"
+ );
+ assert!(file_slice_2.log_files.is_empty());
}
#[tokio::test]
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 9722393..67dad41 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -21,7 +21,7 @@ pub(crate) mod selector;
use crate::config::HudiConfigs;
use crate::error::CoreError;
-use crate::file_group::builder::{build_file_groups,
build_replaced_file_groups};
+use crate::file_group::builder::{build_file_groups,
build_replaced_file_groups, FileGroupMerger};
use crate::file_group::FileGroup;
use crate::storage::Storage;
use crate::timeline::selector::TimelineSelector;
@@ -220,7 +220,7 @@ impl Timeline {
let commits = selector.select(self)?;
for commit in commits {
let commit_metadata = self.get_commit_metadata(&commit).await?;
- file_groups.extend(build_file_groups(&commit_metadata)?);
+ file_groups.merge(build_file_groups(&commit_metadata)?)?;
if commit.is_replacecommit() {
replaced_file_groups.extend(build_replaced_file_groups(&commit_metadata)?);