This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new aaade0f feat: add table v8+ timeline support for loading old and new
table versions (#395)
aaade0f is described below
commit aaade0fde42606aa2b2bcf668dc07c13d7e9b835
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Dec 7 10:30:12 2025 +0530
feat: add table v8+ timeline support for loading old and new table versions
(#395)
---------
Signed-off-by: Sagar Sumit <[email protected]>
Co-authored-by: Shiyan Xu <[email protected]>
---
crates/core/Cargo.toml | 3 +
crates/core/src/config/internal.rs | 21 +-
crates/core/src/config/read.rs | 2 +-
crates/core/src/config/table.rs | 20 +
crates/core/src/file_group/reader.rs | 8 +-
crates/core/src/lib.rs | 6 +-
crates/core/src/metadata/commit.rs | 230 +++++++++-
crates/core/src/timeline/builder.rs | 344 +++++++++++++++
crates/core/src/timeline/instant.rs | 181 +++++++-
crates/core/src/timeline/loader.rs | 466 +++++++++++++++++++++
crates/core/src/timeline/lsm_tree.rs | 109 +++++
crates/core/src/timeline/mod.rs | 333 ++++++++++++---
crates/core/src/timeline/selector.rs | 172 +++++++-
.../core/tests/data/commit_metadata/v6_commit.json | 38 ++
.../tests/data/commit_metadata/v8_deltacommit.avro | Bin 0 -> 4483 bytes
.../.hoodie/hoodie.properties | 23 +
.../.hoodie/hoodie.properties | 23 +
crates/datafusion/src/lib.rs | 64 +--
18 files changed, 1909 insertions(+), 134 deletions(-)
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index e8bde12..eb958a9 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -27,6 +27,9 @@ description.workspace = true
homepage.workspace = true
repository.workspace = true
+[lints.rust]
+unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] }
+
[dependencies]
# arrow
arrow = { workspace = true }
diff --git a/crates/core/src/config/internal.rs
b/crates/core/src/config/internal.rs
index 61f74c6..45db8b4 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -37,18 +37,31 @@ use crate::config::{ConfigParser, HudiConfigValue};
/// use hudi_core::table::Table as HudiTable;
///
/// let options = [(SkipConfigValidation, "true")];
-/// HudiTable::new_with_options("/tmp/hudi_data", options)
+/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
/// ```
///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiInternalConfig {
SkipConfigValidation,
+ /// Enable reading archived timeline (v1) and LSM history (v2).
+ ///
+ /// When enabled, timeline queries with time range filters will include
archived instants
+ /// in addition to active instants. When disabled (default), only active
timeline is read.
+ ///
+ /// Note: Archived instants are only loaded when BOTH conditions are met:
+ /// 1. This config is set to `true`
+ /// 2. The query specifies a time range filter (start or end timestamp)
+ ///
+ /// Queries without time filters (e.g., `get_completed_commits()`) will
never load
+ /// archived instants, regardless of this setting.
+ TimelineArchivedReadEnabled,
}
impl AsRef<str> for HudiInternalConfig {
fn as_ref(&self) -> &str {
match self {
Self::SkipConfigValidation =>
"hoodie.internal.skip.config.validation",
+ Self::TimelineArchivedReadEnabled =>
"hoodie.internal.timeline.archived.enabled",
}
}
}
@@ -65,6 +78,7 @@ impl ConfigParser for HudiInternalConfig {
fn default_value(&self) -> Option<HudiConfigValue> {
match self {
Self::SkipConfigValidation =>
Some(HudiConfigValue::Boolean(false)),
+ Self::TimelineArchivedReadEnabled =>
Some(HudiConfigValue::Boolean(false)),
}
}
@@ -80,6 +94,11 @@ impl ConfigParser for HudiInternalConfig {
bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
+ Self::TimelineArchivedReadEnabled => get_result
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
+ })
+ .map(HudiConfigValue::Boolean),
}
}
}
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index dae0a44..4aad618 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -37,7 +37,7 @@ use crate::config::{ConfigParser, HudiConfigValue};
/// use hudi_core::table::Table as HudiTable;
///
/// let options = [(InputPartitions, "2")];
-/// HudiTable::new_with_options("/tmp/hudi_data", options)
+/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
/// ```
///
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 06f3dec..e3fd273 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -117,6 +117,17 @@ pub enum HudiTableConfig {
///
/// - [`TimelineTimezoneValue`] - Possible values for this configuration.
TimelineTimezone,
+
+ /// Folder for archived timeline files for layout v1 (default:
.hoodie/archived)
+ ArchiveLogFolder,
+
+ /// Path for timeline directory for layout v2 (v8+), relative to .hoodie/
(default: timeline)
+ /// The full path will be `.hoodie/{TimelinePath}`
+ TimelinePath,
+
+ /// Path for LSM timeline history for layout v2, relative to timeline path
(default: history)
+ /// The full path will be `.hoodie/{TimelinePath}/{TimelineHistoryPath}`
+ TimelineHistoryPath,
}
impl AsRef<str> for HudiTableConfig {
@@ -141,6 +152,9 @@ impl AsRef<str> for HudiTableConfig {
Self::TableVersion => "hoodie.table.version",
Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
Self::TimelineTimezone => "hoodie.table.timeline.timezone",
+ Self::ArchiveLogFolder => "hoodie.archivelog.folder",
+ Self::TimelinePath => "hoodie.timeline.path",
+ Self::TimelineHistoryPath => "hoodie.timeline.history.path",
}
}
}
@@ -166,6 +180,9 @@ impl ConfigParser for HudiTableConfig {
Self::TimelineTimezone => Some(HudiConfigValue::String(
TimelineTimezoneValue::UTC.as_ref().to_string(),
)),
+ Self::ArchiveLogFolder =>
Some(HudiConfigValue::String(".hoodie/archived".to_string())),
+ Self::TimelinePath =>
Some(HudiConfigValue::String("timeline".to_string())),
+ Self::TimelineHistoryPath =>
Some(HudiConfigValue::String("history".to_string())),
_ => None,
}
}
@@ -238,6 +255,9 @@ impl ConfigParser for HudiTableConfig {
Self::TimelineTimezone => get_result
.and_then(TimelineTimezoneValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+ Self::ArchiveLogFolder => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::TimelinePath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::TimelineHistoryPath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
}
}
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 53abbfb..ae717fb 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -75,8 +75,8 @@ impl FileGroupReader {
/// Creates a new reader with the given base URI and options.
///
/// # Arguments
- /// * `base_uri` - The base URI of the file group's residing table.
- /// * `options` - Additional options for the reader.
+ /// * `base_uri` - The base URI of the file group's residing table.
+ /// * `options` - Additional options for the reader.
///
/// # Notes
/// This API uses [`OptionResolver`] that loads table properties from
storage to resolve options.
@@ -164,7 +164,7 @@ impl FileGroupReader {
/// Reads the data from the base file at the given relative path.
///
/// # Arguments
- /// * `relative_path` - The relative path to the base file.
+ /// * `relative_path` - The relative path to the base file.
///
/// # Returns
/// A record batch read from the base file.
@@ -216,7 +216,7 @@ impl FileGroupReader {
/// Reads the data from the given file slice.
///
/// # Arguments
- /// * `file_slice` - The file slice to read.
+ /// * `file_slice` - The file slice to read.
///
/// # Returns
/// A record batch read from the file slice.
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 3ba206a..9438db5 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,11 +23,11 @@
//! **Example**
//!
//! ```rust
-//! use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp,
InputPartitions};
+//! use hudi_core::config::read::HudiReadConfig::InputPartitions;
//! use hudi_core::table::Table as HudiTable;
//!
-//! let options = [(InputPartitions, "2"), (AsOfTimestamp,
"20240101010100000")];
-//! HudiTable::new_with_options("/tmp/hudi_data", options);
+//! let options = [(InputPartitions, "2")];
+//! HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
//! ```
//!
//! # The [table] module is responsible for managing Hudi tables.
diff --git a/crates/core/src/metadata/commit.rs
b/crates/core/src/metadata/commit.rs
index 579874b..6d898e1 100644
--- a/crates/core/src/metadata/commit.rs
+++ b/crates/core/src/metadata/commit.rs
@@ -19,17 +19,23 @@
use crate::error::CoreError;
use crate::Result;
+use apache_avro::from_value;
+use apache_avro::Reader as AvroReader;
use apache_avro_derive::AvroSchema as DeriveAvroSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::HashMap;
+use std::io::Cursor;
/// Represents statistics for a single file write operation in a commit
///
/// This struct is automatically derived to/from Avro schema using
apache-avro-derive.
/// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
-#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
-#[serde(rename_all = "camelCase")]
+///
+/// Note: For v8+ tables with Avro format, additional fields may be present in
the data
+/// that are not captured here. Use `#[serde(default)]` to handle missing
fields gracefully.
+#[derive(Debug, Clone, Default, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase", default)]
#[avro(namespace = "org.apache.hudi.avro.model")]
pub struct HoodieWriteStat {
#[avro(rename = "fileId")]
@@ -53,6 +59,43 @@ pub struct HoodieWriteStat {
pub total_write_bytes: Option<i64>,
#[avro(rename = "totalWriteErrors")]
pub total_write_errors: Option<i64>,
+ // Additional fields from v8+ Avro schema
+ #[avro(rename = "partitionPath")]
+ pub partition_path: Option<String>,
+ #[avro(rename = "totalLogRecords")]
+ pub total_log_records: Option<i64>,
+ #[avro(rename = "totalLogFiles")]
+ pub total_log_files: Option<i64>,
+ #[avro(rename = "totalUpdatedRecordsCompacted")]
+ pub total_updated_records_compacted: Option<i64>,
+ #[avro(rename = "totalLogBlocks")]
+ pub total_log_blocks: Option<i64>,
+ #[avro(rename = "totalCorruptLogBlock")]
+ pub total_corrupt_log_block: Option<i64>,
+ #[avro(rename = "totalRollbackBlocks")]
+ pub total_rollback_blocks: Option<i64>,
+ #[avro(rename = "fileSizeInBytes")]
+ pub file_size_in_bytes: Option<i64>,
+ #[avro(rename = "logVersion")]
+ pub log_version: Option<i32>,
+ #[avro(rename = "logOffset")]
+ pub log_offset: Option<i64>,
+ #[avro(rename = "prevBaseFile")]
+ pub prev_base_file: Option<String>,
+ #[avro(rename = "minEventTime")]
+ pub min_event_time: Option<i64>,
+ #[avro(rename = "maxEventTime")]
+ pub max_event_time: Option<i64>,
+ #[avro(rename = "totalLogFilesCompacted")]
+ pub total_log_files_compacted: Option<i64>,
+ #[avro(rename = "totalLogReadTimeMs")]
+ pub total_log_read_time_ms: Option<i64>,
+ #[avro(rename = "totalLogSizeCompacted")]
+ pub total_log_size_compacted: Option<i64>,
+ #[avro(rename = "tempPath")]
+ pub temp_path: Option<String>,
+ #[avro(rename = "numUpdates")]
+ pub num_updates: Option<i64>,
}
/// Represents the metadata for a Hudi commit
@@ -63,13 +106,14 @@ pub struct HoodieWriteStat {
/// # Example
/// ```
/// use hudi_core::metadata::commit::HoodieCommitMetadata;
+/// use apache_avro::schema::AvroSchema;
///
/// // Get the Avro schema
/// let schema = HoodieCommitMetadata::get_schema();
/// println!("Schema: {}", schema.canonical_form());
/// ```
-#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
-#[serde(rename_all = "camelCase")]
+#[derive(Debug, Clone, Default, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase", default)]
#[avro(namespace = "org.apache.hudi.avro.model")]
pub struct HoodieCommitMetadata {
pub version: Option<i32>,
@@ -99,6 +143,44 @@ impl HoodieCommitMetadata {
})
}
+ /// Parse commit metadata from Avro bytes (v8+ format)
+ ///
+ /// The Avro data should be in Avro Object Container format with an
embedded schema.
+ /// This format is used by table version 8 and later for
commit/deltacommit instants.
+ pub fn from_avro_bytes(bytes: &[u8]) -> Result<Self> {
+ let cursor = Cursor::new(bytes);
+ let reader = AvroReader::new(cursor).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to create Avro reader:
{}", e))
+ })?;
+
+ // The commit metadata file should contain exactly one record
+ let mut records = reader;
+ let value = records
+ .next()
+ .ok_or_else(|| CoreError::CommitMetadata("Avro file contains no
records".to_string()))?
+ .map_err(|e| CoreError::CommitMetadata(format!("Failed to read
Avro record: {}", e)))?;
+
+ from_value::<Self>(&value).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to deserialize Avro
value: {}", e))
+ })
+ }
+
+ /// Convert commit metadata to a JSON Map for compatibility with existing
code
+ ///
+ /// This is useful when the metadata is read from Avro format but needs to
be
+ /// processed by code that expects a serde_json Map.
+ pub fn to_json_map(&self) -> Result<Map<String, Value>> {
+ let value = serde_json::to_value(self).map_err(|e| {
+ CoreError::CommitMetadata(format!("Failed to convert to JSON
value: {}", e))
+ })?;
+ match value {
+ Value::Object(map) => Ok(map),
+ _ => Err(CoreError::CommitMetadata(
+ "Expected JSON object".to_string(),
+ )),
+ }
+ }
+
/// Get the write stats for a specific partition
pub fn get_partition_write_stats(&self, partition: &str) ->
Option<&Vec<HoodieWriteStat>> {
self.partition_to_write_stats
@@ -432,4 +514,144 @@ mod tests {
let count = metadata.iter_replace_file_ids().count();
assert_eq!(count, 0);
}
+
+ #[test]
+ fn test_parse_v6_commit_json() {
+ // Test parsing v6 COW table commit metadata (JSON format)
+ let file_path = concat!(
+ env!("CARGO_MANIFEST_DIR"),
+ "/tests/data/commit_metadata/v6_commit.json"
+ );
+ let bytes = std::fs::read(file_path).expect("Failed to read test
fixture");
+
+ let metadata = HoodieCommitMetadata::from_json_bytes(&bytes)
+ .expect("Failed to parse v6 commit metadata");
+
+ // Validate important fields
+ assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+ assert_eq!(metadata.compacted, Some(false));
+
+ // Validate partition write stats
+ let write_stats = metadata
+ .get_partition_write_stats("")
+ .expect("Should have write stats for empty partition");
+ assert_eq!(write_stats.len(), 1);
+
+ let stat = &write_stats[0];
+ assert_eq!(
+ stat.file_id,
+ Some("a079bdb3-731c-4894-b855-abfcd6921007-0".to_string())
+ );
+ assert_eq!(stat.num_writes, Some(4));
+ assert_eq!(stat.num_inserts, Some(1));
+ assert_eq!(stat.num_deletes, Some(0));
+ assert_eq!(stat.num_update_writes, Some(1));
+ assert_eq!(stat.total_write_bytes, Some(441520));
+ assert_eq!(stat.prev_commit, Some("20240418173550988".to_string()));
+ }
+
+ #[test]
+ fn test_parse_v8_deltacommit_avro() {
+ // Test parsing v8 MOR table deltacommit metadata (Avro format)
+ let file_path = concat!(
+ env!("CARGO_MANIFEST_DIR"),
+ "/tests/data/commit_metadata/v8_deltacommit.avro"
+ );
+ let bytes = std::fs::read(file_path).expect("Failed to read test
fixture");
+
+ let metadata = HoodieCommitMetadata::from_avro_bytes(&bytes)
+ .expect("Failed to parse v8 deltacommit metadata");
+
+ // Validate important fields
+ assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+
+ // Validate partition write stats - v8 MOR table should have stats
+ assert!(metadata.partition_to_write_stats.is_some());
+ let partition_map =
metadata.partition_to_write_stats.as_ref().unwrap();
+
+ // Should have write stats for multiple partitions
+ assert!(
+ !partition_map.is_empty(),
+ "Should have write stats for at least one partition"
+ );
+
+ // Validate at least one partition has valid stats
+ for (partition, stats) in partition_map {
+ if !stats.is_empty() {
+ let stat = &stats[0];
+ // file_id should be present
+ assert!(
+ stat.file_id.is_some(),
+ "file_id should be present in partition {}",
+ partition
+ );
+ // For UPSERT operation, num_inserts or num_update_writes
should be present
+ if let Some(num_inserts) = stat.num_inserts {
+ assert!(num_inserts >= 0, "num_inserts should be >= 0");
+ }
+ if let Some(num_updates) = stat.num_update_writes {
+ assert!(num_updates >= 0, "num_update_writes should be >=
0");
+ }
+ // Verify partition_path field exists (v8+ specific field)
+ assert!(
+ stat.partition_path.is_some(),
+ "partition_path should be present in v8+ format"
+ );
+ break;
+ }
+ }
+ }
+
+ #[test]
+ fn test_to_json_map() {
+ let metadata = HoodieCommitMetadata {
+ version: Some(1),
+ operation_type: Some("INSERT".to_string()),
+ ..Default::default()
+ };
+
+ let json_map = metadata.to_json_map().unwrap();
+ assert!(json_map.contains_key("version"));
+ assert!(json_map.contains_key("operationType"));
+ }
+
+ #[test]
+ fn test_from_json_map() {
+ let mut map = Map::new();
+ map.insert("version".to_string(), json!(2));
+ map.insert("operationType".to_string(), json!("UPSERT"));
+
+ let metadata = HoodieCommitMetadata::from_json_map(&map).unwrap();
+ assert_eq!(metadata.version, Some(2));
+ assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+ }
+
+ #[test]
+ fn test_get_partitions_with_replacements_sorting() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "p1": ["file1"],
+ "p2": ["file2", "file3"]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let mut partitions = metadata.get_partitions_with_replacements();
+ partitions.sort();
+ assert_eq!(partitions, vec!["p1".to_string(), "p2".to_string()]);
+ }
+
+ #[test]
+ fn test_iter_replace_file_ids_multiple_partitions() {
+ let json = json!({
+ "partitionToReplaceFileIds": {
+ "p1": ["file1"],
+ "p2": ["file2", "file3"]
+ }
+ });
+
+ let metadata: HoodieCommitMetadata =
serde_json::from_value(json).unwrap();
+ let count = metadata.iter_replace_file_ids().count();
+ assert_eq!(count, 3); // 1 from p1, 2 from p2
+ }
}
diff --git a/crates/core/src/timeline/builder.rs
b/crates/core/src/timeline/builder.rs
new file mode 100644
index 0000000..43a55e0
--- /dev/null
+++ b/crates/core/src/timeline/builder.rs
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::internal::HudiInternalConfig::TimelineArchivedReadEnabled;
+use crate::config::table::HudiTableConfig::{TableVersion,
TimelineLayoutVersion};
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::storage::Storage;
+use crate::timeline::loader::TimelineLoader;
+use crate::timeline::Timeline;
+use crate::Result;
+use std::sync::Arc;
+
+pub struct TimelineBuilder {
+ hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
+}
+
+impl TimelineBuilder {
+ pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ }
+ }
+
+ pub async fn build(self) -> Result<Timeline> {
+ let (active_loader, archived_loader) = self.resolve_loader_config()?;
+ let timeline = Timeline::new(
+ self.hudi_configs,
+ self.storage,
+ active_loader,
+ archived_loader,
+ );
+ Ok(timeline)
+ }
+
+ fn resolve_loader_config(&self) -> Result<(TimelineLoader,
Option<TimelineLoader>)> {
+ // Check if archived timeline reading is enabled
+ let archived_enabled: bool = self
+ .hudi_configs
+ .get_or_default(TimelineArchivedReadEnabled)
+ .into();
+
+ let table_version: isize = match self.hudi_configs.get(TableVersion) {
+ Ok(v) => v.into(),
+ Err(_) => {
+ let archived_loader = if archived_enabled {
+ Some(TimelineLoader::new_layout_one_archived(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ ))
+ } else {
+ None
+ };
+ return Ok((
+ TimelineLoader::new_layout_one_active(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ ),
+ archived_loader,
+ ));
+ }
+ };
+
+ let layout_version: isize = self
+ .hudi_configs
+ .try_get(TimelineLayoutVersion)
+ .map(|v| v.into())
+ .unwrap_or_else(|| if table_version == 8 { 2isize } else { 1isize
});
+
+ match layout_version {
+ 1 => {
+ if !(6..=8).contains(&table_version) {
+ return Err(CoreError::Unsupported(format!(
+ "Unsupported table version {} with timeline layout
version {}",
+ table_version, layout_version
+ )));
+ }
+ let archived_loader = if archived_enabled {
+ Some(TimelineLoader::new_layout_one_archived(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ ))
+ } else {
+ None
+ };
+ Ok((
+ TimelineLoader::new_layout_one_active(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ ),
+ archived_loader,
+ ))
+ }
+ 2 => {
+ if table_version < 8 {
+ return Err(CoreError::Unsupported(format!(
+ "Unsupported table version {} with timeline layout
version {}",
+ table_version, layout_version
+ )));
+ }
+ let archived_loader = if archived_enabled {
+ Some(TimelineLoader::new_layout_two_archived(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ ))
+ } else {
+ None
+ };
+ Ok((
+ TimelineLoader::new_layout_two_active(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ ),
+ archived_loader,
+ ))
+ }
+ _ => Err(CoreError::Unsupported(format!(
+ "Unsupported timeline layout version: {}",
+ layout_version
+ ))),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::table::HudiTableConfig;
+ use std::collections::HashMap;
+
+ fn create_test_configs(options: HashMap<String, String>) ->
Arc<HudiConfigs> {
+ let mut opts = options;
+ opts.entry(HudiTableConfig::BasePath.as_ref().to_string())
+ .or_insert("/tmp/test".to_string());
+ Arc::new(HudiConfigs::new(opts))
+ }
+
+ fn create_test_storage(configs: Arc<HudiConfigs>) -> Arc<Storage> {
+ Storage::new(Arc::new(HashMap::new()), configs).unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_build_without_table_version() {
+ let configs = create_test_configs(HashMap::new());
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let timeline = builder.build().await.unwrap();
+ // Should default to layout one active
+ assert!(!timeline.active_loader.is_layout_two_active());
+ assert!(timeline.archived_loader.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_build_with_table_version_6() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "6".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let timeline = builder.build().await.unwrap();
+ // v6 should use layout one
+ assert!(!timeline.active_loader.is_layout_two_active());
+ assert!(timeline.archived_loader.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_build_with_table_version_8_defaults_to_layout_2() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "8".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let timeline = builder.build().await.unwrap();
+ // v8 without explicit layout version defaults to layout 2
+ assert!(timeline.active_loader.is_layout_two_active());
+ assert!(timeline.archived_loader.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_build_with_explicit_layout_version_1() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "8".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+ "1".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let timeline = builder.build().await.unwrap();
+ // Explicit layout version 1
+ assert!(!timeline.active_loader.is_layout_two_active());
+ }
+
+ #[tokio::test]
+ async fn test_build_with_archived_enabled() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "8".to_string(),
+ );
+ options.insert(
+ TimelineArchivedReadEnabled.as_ref().to_string(),
+ "true".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let timeline = builder.build().await.unwrap();
+ assert!(timeline.active_loader.is_layout_two_active());
+ assert!(timeline.archived_loader.is_some());
+ assert!(timeline
+ .archived_loader
+ .as_ref()
+ .unwrap()
+ .is_layout_two_archived());
+ }
+
+ #[tokio::test]
+ async fn test_build_layout_1_with_archived() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "7".to_string(),
+ );
+ options.insert(
+ TimelineArchivedReadEnabled.as_ref().to_string(),
+ "true".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let timeline = builder.build().await.unwrap();
+ assert!(!timeline.active_loader.is_layout_two_active());
+ assert!(timeline.archived_loader.is_some());
+ assert!(!timeline
+ .archived_loader
+ .as_ref()
+ .unwrap()
+ .is_layout_two_archived());
+ }
+
+ #[tokio::test]
+ async fn test_unsupported_table_version_with_layout_1() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "5".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+ "1".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let result = builder.build().await;
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Unsupported table version 5"));
+ }
+
+ #[tokio::test]
+ async fn test_unsupported_table_version_with_layout_2() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "7".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+ "2".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let result = builder.build().await;
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Unsupported table version 7"));
+ }
+
+ #[tokio::test]
+ async fn test_unsupported_layout_version() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::TableVersion.as_ref().to_string(),
+ "8".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+ "3".to_string(),
+ );
+ let configs = create_test_configs(options);
+ let storage = create_test_storage(configs.clone());
+ let builder = TimelineBuilder::new(configs, storage);
+
+ let result = builder.build().await;
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Unsupported timeline layout version: 3"));
+ }
+}
diff --git a/crates/core/src/timeline/instant.rs
b/crates/core/src/timeline/instant.rs
index e3e191e..1cd29af 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -98,10 +98,19 @@ impl AsRef<str> for State {
}
/// An [Instant] represents a point in time when an action was performed on
the table.
+///
+/// For table version 8+, completed instants have a different filename format:
+/// `{requestedTimestamp}_{completedTimestamp}.{action}` instead of
`{timestamp}.{action}`.
+/// The `timestamp` field stores the requested timestamp, and
`completed_timestamp` stores
+/// the completion timestamp for v8+ completed instants.
#[allow(dead_code)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Instant {
+ /// The timestamp when the action was requested (used for ordering and
identification).
+ /// TODO rename to requested_timestamp for clarity in v8+?
pub timestamp: String,
+ /// The timestamp when the action completed (only present for v8+
completed instants).
+ pub completed_timestamp: Option<String>,
pub action: Action,
pub state: State,
pub epoch_millis: i64,
@@ -132,19 +141,47 @@ impl FromStr for Instant {
impl Instant {
pub fn try_from_file_name_and_timezone(file_name: &str, timezone: &str) ->
Result<Self> {
- let (timestamp, action_suffix) = file_name
+ let (timestamp_part, action_suffix) = file_name
.split_once('.')
.ok_or_else(|| CoreError::Timeline(format!("Invalid file name:
{}", file_name)))?;
- Self::validate_timestamp(timestamp)?;
- let dt = Self::parse_datetime(timestamp, timezone)?;
+
let (action, state) = Self::parse_action_and_state(action_suffix)?;
- Ok(Self {
- timestamp: timestamp.to_string(),
- state,
- action,
- epoch_millis: dt.timestamp_millis(),
- })
+ // Check for v8+ completed instant format:
{requestedTimestamp}_{completedTimestamp}.{action}
+ // This format is only used for completed instants (state == Completed)
+ if let Some((requested_ts, completed_ts)) =
timestamp_part.split_once('_') {
+ // This is a v8+ completed instant with both requested and
completed timestamps
+ Self::validate_timestamp(requested_ts)?;
+ Self::validate_timestamp(completed_ts)?;
+ let dt = Self::parse_datetime(requested_ts, timezone)?;
+
+ if state != State::Completed {
+ return Err(CoreError::Timeline(format!(
+ "Underscore timestamp format is only valid for completed
instants: {}",
+ file_name
+ )));
+ }
+
+ Ok(Self {
+ timestamp: requested_ts.to_string(),
+ completed_timestamp: Some(completed_ts.to_string()),
+ state,
+ action,
+ epoch_millis: dt.timestamp_millis(),
+ })
+ } else {
+ // pre v8 format: {timestamp}.{action}[.{state}]
+ Self::validate_timestamp(timestamp_part)?;
+ let dt = Self::parse_datetime(timestamp_part, timezone)?;
+
+ Ok(Self {
+ timestamp: timestamp_part.to_string(),
+ completed_timestamp: None,
+ state,
+ action,
+ epoch_millis: dt.timestamp_millis(),
+ })
+ }
}
fn validate_timestamp(timestamp: &str) -> Result<()> {
@@ -203,7 +240,19 @@ impl Instant {
pub fn file_name(&self) -> String {
match (&self.action, &self.state) {
- (_, State::Completed) => format!("{}.{}", self.timestamp,
self.action.as_ref()),
+ (_, State::Completed) => {
+ // For v8+ completed instants with completed_timestamp, use
the underscore format
+ if let Some(completed_ts) = &self.completed_timestamp {
+ format!(
+ "{}_{}.{}",
+ self.timestamp,
+ completed_ts,
+ self.action.as_ref()
+ )
+ } else {
+ format!("{}.{}", self.timestamp, self.action.as_ref())
+ }
+ }
(Action::Commit, State::Inflight) => {
format!("{}.{}", self.timestamp, self.state.as_ref())
}
@@ -216,8 +265,16 @@ impl Instant {
}
}
+ /// Get the relative path with the default `.hoodie/` base directory.
+ /// For v8+ tables with Layout Version 2, use `relative_path_with_base`
instead.
pub fn relative_path(&self) -> Result<String> {
- let mut commit_file_path = PathBuf::from(HUDI_METADATA_DIR);
+ self.relative_path_with_base(HUDI_METADATA_DIR)
+ }
+
+ /// Get the relative path with a specified base directory.
+ /// Use `.hoodie/` for pre-v8 tables (Layout Version 1) or
`.hoodie/timeline/` for v8+ tables (Layout Version 2).
+ pub fn relative_path_with_base(&self, base_dir: &str) -> Result<String> {
+ let mut commit_file_path = PathBuf::from(base_dir);
commit_file_path.push(self.file_name());
commit_file_path
.to_str()
@@ -379,4 +436,106 @@ mod tests {
None => std::env::remove_var("TZ"),
}
}
+
+ #[test]
+ fn test_parse_action_and_state() -> Result<()> {
+ // Test action with state suffix
+ let (action, state) =
Instant::parse_action_and_state("commit.inflight")?;
+ assert_eq!(action, Action::Commit);
+ assert_eq!(state, State::Inflight);
+
+ let (action, state) =
Instant::parse_action_and_state("deltacommit.requested")?;
+ assert_eq!(action, Action::DeltaCommit);
+ assert_eq!(state, State::Requested);
+
+ // Test standalone "inflight" special case
+ let (action, state) = Instant::parse_action_and_state("inflight")?;
+ assert_eq!(action, Action::Commit);
+ assert_eq!(state, State::Inflight);
+
+ // Test completed state (no suffix)
+ let (action, state) = Instant::parse_action_and_state("commit")?;
+ assert_eq!(action, Action::Commit);
+ assert_eq!(state, State::Completed);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_relative_path_with_base() -> Result<()> {
+ let instant = Instant::from_str("20240101120000.commit")?;
+
+ let path = instant.relative_path_with_base(".hoodie")?;
+ assert_eq!(path, ".hoodie/20240101120000.commit");
+
+ let path = instant.relative_path_with_base(".hoodie/timeline")?;
+ assert_eq!(path, ".hoodie/timeline/20240101120000.commit");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_is_replacecommit() -> Result<()> {
+ let replace_instant =
Instant::from_str("20240101120000.replacecommit")?;
+ assert!(replace_instant.is_replacecommit());
+
+ let commit_instant = Instant::from_str("20240101120000.commit")?;
+ assert!(!commit_instant.is_replacecommit());
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_v8_instant_with_completed_timestamp() -> Result<()> {
+ // v8+ format: {requestedTimestamp}_{completedTimestamp}.{action}
+ let instant =
Instant::from_str("20240101120000000_20240101120005000.commit")?;
+ assert_eq!(instant.timestamp, "20240101120000000");
+ assert_eq!(
+ instant.completed_timestamp,
+ Some("20240101120005000".to_string())
+ );
+ assert_eq!(instant.action, Action::Commit);
+ assert_eq!(instant.state, State::Completed);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_v8_instant_underscore_format_requires_completed_state() {
+ // Underscore format with non-completed state should fail
+ let result =
Instant::from_str("20240101120000000_20240101120005000.commit.inflight");
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("only valid for completed"));
+ }
+
+ #[test]
+ fn test_file_name_with_completed_timestamp() -> Result<()> {
+ let instant =
Instant::from_str("20240101120000000_20240101120005000.commit")?;
+ let file_name = instant.file_name();
+ assert_eq!(file_name, "20240101120000000_20240101120005000.commit");
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_parse_datetime_with_milliseconds() -> Result<()> {
+ let dt = Instant::parse_datetime("20240315142530500", "UTC")?;
+ assert_eq!(dt.timestamp_millis() % 1000, 500);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_validate_timestamp_errors() {
+ // Too short
+ let result = Instant::from_str("2024.commit");
+ assert!(result.is_err());
+
+ // Too long (not 14 or 17)
+ let result = Instant::from_str("202403151425301.commit");
+ assert!(result.is_err());
+ }
}
diff --git a/crates/core/src/timeline/loader.rs
b/crates/core/src/timeline/loader.rs
new file mode 100644
index 0000000..52db158
--- /dev/null
+++ b/crates/core/src/timeline/loader.rs
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::table::HudiTableConfig::{ArchiveLogFolder,
TimelineHistoryPath, TimelinePath};
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::metadata::commit::HoodieCommitMetadata;
+use crate::metadata::HUDI_METADATA_DIR;
+use crate::storage::Storage;
+use crate::timeline::instant::Instant;
+use crate::timeline::selector::TimelineSelector;
+use crate::Result;
+use log::debug;
+use serde_json::{Map, Value};
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct TimelineLoader {
+ hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
+ layout: TimelineLayout,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum TimelineLayout {
+ V1Active,
+ V1Archived,
+ V2Active,
+ V2Archived,
+}
+
+#[allow(dead_code)]
+impl TimelineLoader {
+ /// Create a new Layout One Active loader
+ pub fn new_layout_one_active(hudi_configs: Arc<HudiConfigs>, storage:
Arc<Storage>) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ layout: TimelineLayout::V1Active,
+ }
+ }
+
+ /// Create a new Layout One Archived loader
+ pub fn new_layout_one_archived(hudi_configs: Arc<HudiConfigs>, storage:
Arc<Storage>) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ layout: TimelineLayout::V1Archived,
+ }
+ }
+
+ /// Create a new Layout Two Active loader
+ pub fn new_layout_two_active(hudi_configs: Arc<HudiConfigs>, storage:
Arc<Storage>) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ layout: TimelineLayout::V2Active,
+ }
+ }
+
+ /// Create a new Layout Two Archived loader
+ pub fn new_layout_two_archived(hudi_configs: Arc<HudiConfigs>, storage:
Arc<Storage>) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ layout: TimelineLayout::V2Archived,
+ }
+ }
+
+ /// Returns the storage for this loader.
+ fn storage(&self) -> &Arc<Storage> {
+ &self.storage
+ }
+
+ /// Check if this is a Layout Two Active loader (for testing/assertions)
+ #[cfg(test)]
+ pub(crate) fn is_layout_two_active(&self) -> bool {
+ matches!(self.layout, TimelineLayout::V2Active)
+ }
+
+ /// Check if this is a Layout Two Archived loader (for testing/assertions)
+ #[cfg(test)]
+ pub(crate) fn is_layout_two_archived(&self) -> bool {
+ matches!(self.layout, TimelineLayout::V2Archived)
+ }
+
+ /// Returns the directory for active timeline instants.
+ ///
+ /// - Layout One (v6-v8): `.hoodie/`
+ /// - Layout Two (v8+): `.hoodie/{timeline_path}` (configurable via
`hoodie.timeline.path`, default: `timeline`)
+ fn get_active_timeline_dir(&self) -> String {
+ match self.layout {
+ TimelineLayout::V1Active | TimelineLayout::V1Archived =>
HUDI_METADATA_DIR.to_string(),
+ TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+ let timeline_path: String =
self.hudi_configs.get_or_default(TimelinePath).into();
+ format!("{}/{}", HUDI_METADATA_DIR, timeline_path)
+ }
+ }
+ }
+
+ /// Returns the directory for archived timeline instants.
+ ///
+ /// - Layout One (v6-v8): configurable via `hoodie.archivelog.folder`
(default: `.hoodie/archived`)
+ /// - Layout Two (v8+): `.hoodie/{timeline_path}/{history_path}` (LSM
history)
+ fn get_archived_timeline_dir(&self) -> String {
+ match self.layout {
+ TimelineLayout::V1Active | TimelineLayout::V1Archived => {
+ // Layout 1 uses hoodie.archivelog.folder for archived timeline
+ self.hudi_configs.get_or_default(ArchiveLogFolder).into()
+ }
+ TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+ // Layout 2 uses LSM history directory
+ let timeline_path: String =
self.hudi_configs.get_or_default(TimelinePath).into();
+ let history_path: String =
+
self.hudi_configs.get_or_default(TimelineHistoryPath).into();
+ format!("{}/{}/{}", HUDI_METADATA_DIR, timeline_path,
history_path)
+ }
+ }
+ }
+
+ /// Returns the appropriate timeline directory based on loader type
(active vs archived).
+ ///
+ /// This is a convenience method that delegates to either
`get_active_timeline_dir`
+ /// or `get_archived_timeline_dir` depending on the layout.
+ pub fn get_timeline_dir(&self) -> String {
+ match self.layout {
+ TimelineLayout::V1Active | TimelineLayout::V2Active =>
self.get_active_timeline_dir(),
+ TimelineLayout::V1Archived | TimelineLayout::V2Archived => {
+ self.get_archived_timeline_dir()
+ }
+ }
+ }
+
+ pub async fn load_instants(
+ &self,
+ selector: &TimelineSelector,
+ desc: bool,
+ ) -> Result<Vec<Instant>> {
+ match self.layout {
+ TimelineLayout::V1Active => {
+ let files =
self.storage.list_files(Some(HUDI_METADATA_DIR)).await?;
+ let mut instants = Vec::with_capacity(files.len() / 3);
+
+ for file_info in files {
+ match selector.try_create_instant(file_info.name.as_str())
{
+ Ok(instant) => instants.push(instant),
+ Err(e) => {
+ debug!(
+ "Instant not created from file {:?} due to:
{:?}",
+ file_info, e
+ );
+ }
+ }
+ }
+
+ instants.sort_unstable();
+ instants.shrink_to_fit();
+
+ if desc {
+ Ok(instants.into_iter().rev().collect())
+ } else {
+ Ok(instants)
+ }
+ }
+ TimelineLayout::V2Active => {
+ let timeline_dir = self.get_timeline_dir();
+ let files =
self.storage.list_files(Some(&timeline_dir)).await?;
+ let mut instants = Vec::new();
+
+ for file_info in files {
+ // TODO: make `storage.list_files` api support such
filtering, like ignore crc and return files only
+ if file_info.name.starts_with("history/") ||
file_info.name.ends_with(".crc") {
+ continue;
+ }
+ match selector.try_create_instant(file_info.name.as_str())
{
+ Ok(instant) => instants.push(instant),
+ Err(e) => {
+ debug!(
+ "Instant not created from file {:?} due to:
{:?}",
+ file_info, e
+ );
+ }
+ }
+ }
+
+ instants.sort_unstable();
+ instants.shrink_to_fit();
+
+ if desc {
+ Ok(instants.into_iter().rev().collect())
+ } else {
+ Ok(instants)
+ }
+ }
+ _ => Err(CoreError::Unsupported(
+ "Loading from this timeline layout is not implemented
yet.".to_string(),
+ )),
+ }
+ }
+
+ /// Load archived timeline instants based on selector criteria.
+ ///
+ /// # Behavior
+ ///
+ /// - Returns empty Vec if this is an active loader (not an archived
loader)
+ /// - Attempts to load archived instants, propagating any errors
+ ///
+ /// Note: This method assumes the archived loader was created only when
+ /// `TimelineArchivedReadEnabled` is true. The config check is done in the
builder.
+ ///
+ /// # Arguments
+ ///
+ /// * `selector` - The criteria for selecting instants (actions, states,
time range)
+ /// * `desc` - If true, return instants in descending order by timestamp
+ pub(crate) async fn load_archived_instants(
+ &self,
+ selector: &TimelineSelector,
+ desc: bool,
+ ) -> Result<Vec<Instant>> {
+ // Early return for active loaders - they don't have archived parts
+ match self.layout {
+ TimelineLayout::V1Active | TimelineLayout::V2Active => return
Ok(Vec::new()),
+ _ => {}
+ }
+
+ match self.layout {
+ TimelineLayout::V1Archived => {
+ // Resolve archive folder from configs or fallback
+ let archive_dir: String =
self.hudi_configs.get_or_default(ArchiveLogFolder).into();
+
+ // List files and try creating instants through selector
+ let files = self.storage.list_files(Some(&archive_dir)).await?;
+ let mut instants = Vec::new();
+ for file_info in files {
+ if let Ok(instant) =
selector.try_create_instant(file_info.name.as_str()) {
+ instants.push(instant);
+ }
+ }
+ instants.sort_unstable();
+ if desc {
+ instants.reverse();
+ }
+ Ok(instants)
+ }
+ TimelineLayout::V2Archived => {
+ // TODO: Implement v2 LSM history reader. For now, return
empty.
+ let _ = (selector, desc);
+ Ok(Vec::new())
+ }
+ _ => Ok(Vec::new()),
+ }
+ }
+
+ /// Load instant metadata from storage and parse based on the layout
version.
+ ///
+ /// Layout Version 1 (v6-v8): JSON format
+ /// Layout Version 2 (v8+): Avro format
+ ///
+ /// Returns the metadata as a JSON Map for uniform processing.
+ pub(crate) async fn load_instant_metadata(
+ &self,
+ instant: &Instant,
+ ) -> Result<Map<String, Value>> {
+ let timeline_dir = self.get_timeline_dir();
+ let path = instant.relative_path_with_base(&timeline_dir)?;
+ let bytes = self.storage.get_file_data(path.as_str()).await?;
+
+ match self.layout {
+ TimelineLayout::V1Active | TimelineLayout::V1Archived => {
+ // Layout 1: JSON format
+ serde_json::from_slice(&bytes).map_err(|e| {
+ CoreError::Timeline(format!("Failed to parse JSON commit
metadata: {}", e))
+ })
+ }
+ TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+ // Layout 2: Avro format
+ let metadata = HoodieCommitMetadata::from_avro_bytes(&bytes)?;
+ metadata.to_json_map()
+ }
+ }
+ }
+
+ /// Load instant metadata and return as a JSON string.
+ ///
+ /// Layout Version 1 (v6-v8): Return raw JSON bytes
+ /// Layout Version 2 (v8+): Parse Avro and serialize to JSON
+ pub(crate) async fn load_instant_metadata_as_json(&self, instant:
&Instant) -> Result<String> {
+ let timeline_dir = self.get_timeline_dir();
+ let path = instant.relative_path_with_base(&timeline_dir)?;
+ let bytes = self.storage.get_file_data(path.as_str()).await?;
+
+ match self.layout {
+ TimelineLayout::V1Active | TimelineLayout::V1Archived => {
+ // Layout 1: JSON format - return raw bytes as string
+ String::from_utf8(bytes.to_vec()).map_err(|e| {
+ CoreError::Timeline(format!("Failed to convert JSON bytes
to string: {}", e))
+ })
+ }
+ TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+ // Layout 2: Avro format - deserialize then serialize to JSON
+ let metadata = HoodieCommitMetadata::from_avro_bytes(&bytes)?;
+ serde_json::to_string(&metadata).map_err(|e| {
+ CoreError::Timeline(format!("Failed to serialize metadata
to JSON: {}", e))
+ })
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::table::HudiTableConfig;
+ use crate::config::HudiConfigs;
+ use std::collections::HashMap;
+
+ fn create_test_configs() -> Arc<HudiConfigs> {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ "/tmp/test".to_string(),
+ );
+ Arc::new(HudiConfigs::new(options))
+ }
+
+ fn create_test_storage(configs: Arc<HudiConfigs>) -> Arc<Storage> {
+ Storage::new(Arc::new(HashMap::new()), configs).unwrap()
+ }
+
+ #[test]
+ fn test_layout_one_active_directory() {
+ let configs = create_test_configs();
+ let storage = create_test_storage(configs.clone());
+ let loader = TimelineLoader::new_layout_one_active(configs, storage);
+
+ assert_eq!(loader.get_active_timeline_dir(), HUDI_METADATA_DIR);
+ assert_eq!(loader.get_timeline_dir(), HUDI_METADATA_DIR);
+ }
+
+ #[test]
+ fn test_layout_one_archived_directory() {
+ let configs = create_test_configs();
+ let storage = create_test_storage(configs.clone());
+ let loader = TimelineLoader::new_layout_one_archived(configs, storage);
+
+ // Default archived folder
+ let expected = ".hoodie/archived";
+ assert_eq!(loader.get_archived_timeline_dir(), expected);
+ assert_eq!(loader.get_timeline_dir(), expected);
+ }
+
+ #[test]
+ fn test_layout_two_active_directory() {
+ let configs = create_test_configs();
+ let storage = create_test_storage(configs.clone());
+ let loader = TimelineLoader::new_layout_two_active(configs, storage);
+
+ // Default timeline path
+ let expected = format!("{}/timeline", HUDI_METADATA_DIR);
+ assert_eq!(loader.get_active_timeline_dir(), expected);
+ assert_eq!(loader.get_timeline_dir(), expected);
+ }
+
+ #[test]
+ fn test_layout_two_archived_directory() {
+ let configs = create_test_configs();
+ let storage = create_test_storage(configs.clone());
+ let loader = TimelineLoader::new_layout_two_archived(configs, storage);
+
+ // Default timeline path and history path
+ let expected = format!("{}/timeline/history", HUDI_METADATA_DIR);
+ assert_eq!(loader.get_archived_timeline_dir(), expected);
+ assert_eq!(loader.get_timeline_dir(), expected);
+ }
+
+ #[test]
+ fn test_custom_archive_folder() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ "/tmp/test".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::ArchiveLogFolder.as_ref().to_string(),
+ ".hoodie/custom_archive".to_string(),
+ );
+ let configs = Arc::new(HudiConfigs::new(options));
+ let storage = create_test_storage(configs.clone());
+ let loader = TimelineLoader::new_layout_one_archived(configs, storage);
+
+ assert_eq!(loader.get_archived_timeline_dir(),
".hoodie/custom_archive");
+ }
+
+ #[test]
+ fn test_custom_timeline_paths() {
+ let mut options = HashMap::new();
+ options.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ "/tmp/test".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::TimelinePath.as_ref().to_string(),
+ "custom_timeline".to_string(),
+ );
+ options.insert(
+ HudiTableConfig::TimelineHistoryPath.as_ref().to_string(),
+ "custom_history".to_string(),
+ );
+ let configs = Arc::new(HudiConfigs::new(options));
+ let storage = create_test_storage(configs.clone());
+
+ let loader = TimelineLoader::new_layout_two_active(configs.clone(),
storage.clone());
+ assert_eq!(
+ loader.get_active_timeline_dir(),
+ format!("{}/custom_timeline", HUDI_METADATA_DIR)
+ );
+
+ let archived_loader = TimelineLoader::new_layout_two_archived(configs,
storage);
+ assert_eq!(
+ archived_loader.get_archived_timeline_dir(),
+ format!("{}/custom_timeline/custom_history", HUDI_METADATA_DIR)
+ );
+ }
+
+ #[test]
+ fn test_layout_type_checks() {
+ let configs = create_test_configs();
+ let storage = create_test_storage(configs.clone());
+
+ let v2_active = TimelineLoader::new_layout_two_active(configs.clone(),
storage.clone());
+ assert!(v2_active.is_layout_two_active());
+ assert!(!v2_active.is_layout_two_archived());
+
+ let v2_archived = TimelineLoader::new_layout_two_archived(configs,
storage);
+ assert!(!v2_archived.is_layout_two_active());
+ assert!(v2_archived.is_layout_two_archived());
+ }
+
+ #[test]
+ fn test_storage_access() {
+ let configs = create_test_configs();
+ let storage = create_test_storage(configs.clone());
+ let storage_ptr = Arc::as_ptr(&storage);
+
+ let loader = TimelineLoader::new_layout_one_active(configs, storage);
+
+ // Verify storage is accessible and is the same instance
+ assert_eq!(Arc::as_ptr(loader.storage()), storage_ptr);
+ }
+}
diff --git a/crates/core/src/timeline/lsm_tree.rs
b/crates/core/src/timeline/lsm_tree.rs
new file mode 100644
index 0000000..276dc51
--- /dev/null
+++ b/crates/core/src/timeline/lsm_tree.rs
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::table::HudiTableConfig::{TimelineHistoryPath, TimelinePath};
+use crate::error::CoreError;
+use crate::metadata::HUDI_METADATA_DIR;
+use crate::storage::Storage;
+use crate::Result;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+
+#[cfg(not(tarpaulin_include))]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct TimelineManifest {
+ pub version: i64,
+ pub entries: Vec<ManifestEntry>,
+}
+
+/// Entry in an LSM timeline manifest.
+/// Each entry describes a compacted timeline file covering a time range
+/// in the LSM history directory.
+/// - `file_name`: relative path of the compacted timeline file under history
+/// - `min_instant`/`max_instant`: smallest/largest instant timestamps covered
+/// - `level`: LSM level where this file resides
+/// - `file_size`: size in bytes of the file
+#[cfg(not(tarpaulin_include))]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ManifestEntry {
+ pub file_name: String,
+ pub min_instant: String,
+ pub max_instant: String,
+ pub level: i32,
+ pub file_size: i64,
+}
+
+/// LSM tree for v8+ timeline history management.
+/// The paths are resolved from configs on-the-fly via `hoodie.timeline.path`
and `hoodie.timeline.history.path`.
+#[cfg(not(tarpaulin_include))]
+pub struct LSMTree {
+ storage: Arc<Storage>,
+}
+
+#[cfg(not(tarpaulin_include))]
+impl LSMTree {
+ pub fn new(storage: Arc<Storage>) -> Self {
+ Self { storage }
+ }
+
+ /// Returns the timeline directory path, resolved from configs.
+ pub fn timeline_dir(&self) -> String {
+ let timeline_path: String = self
+ .storage
+ .hudi_configs
+ .get_or_default(TimelinePath)
+ .into();
+ format!("{}/{}", HUDI_METADATA_DIR, timeline_path)
+ }
+
+ /// Returns the history directory path, resolved from configs.
+ pub fn history_dir(&self) -> String {
+ let timeline_path: String = self
+ .storage
+ .hudi_configs
+ .get_or_default(TimelinePath)
+ .into();
+ let history_path: String = self
+ .storage
+ .hudi_configs
+ .get_or_default(TimelineHistoryPath)
+ .into();
+ format!("{}/{}/{}", HUDI_METADATA_DIR, timeline_path, history_path)
+ }
+
+ pub async fn read_manifest(&self) -> Result<Option<TimelineManifest>> {
+ let history_dir = self.history_dir();
+ let version_path = format!("{}/_version_", history_dir);
+ if let Ok(data) = self.storage.get_file_data(&version_path).await {
+ let version_str =
+ String::from_utf8(data.to_vec()).map_err(|e|
CoreError::Timeline(e.to_string()))?;
+ let version = version_str
+ .trim()
+ .parse::<i64>()
+ .map_err(|e| CoreError::Timeline(e.to_string()))?;
+ let manifest_path = format!("{}/manifest_{}", history_dir,
version);
+ let data = self.storage.get_file_data(&manifest_path).await?;
+ let manifest: TimelineManifest =
+ serde_json::from_slice(&data).map_err(|e|
CoreError::Timeline(e.to_string()))?;
+ Ok(Some(manifest))
+ } else {
+ Ok(None)
+ }
+ }
+}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 094c07d..f916f11 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -16,7 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
+pub mod builder;
pub mod instant;
+pub mod loader;
+pub mod lsm_tree;
pub(crate) mod selector;
pub(crate) mod util;
@@ -24,17 +27,18 @@ use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::file_group::builder::{build_file_groups,
build_replaced_file_groups, FileGroupMerger};
use crate::file_group::FileGroup;
-use crate::metadata::HUDI_METADATA_DIR;
use crate::schema::resolver::{
resolve_avro_schema_from_commit_metadata,
resolve_schema_from_commit_metadata,
};
use crate::storage::Storage;
+use crate::timeline::builder::TimelineBuilder;
use crate::timeline::instant::Action;
+use crate::timeline::loader::TimelineLoader;
use crate::timeline::selector::TimelineSelector;
use crate::Result;
use arrow_schema::Schema;
use instant::Instant;
-use log::debug;
+
use serde_json::{Map, Value};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
@@ -46,6 +50,8 @@ use std::sync::Arc;
pub struct Timeline {
hudi_configs: Arc<HudiConfigs>,
pub(crate) storage: Arc<Storage>,
+ active_loader: TimelineLoader,
+ archived_loader: Option<TimelineLoader>,
pub completed_commits: Vec<Instant>,
}
@@ -54,18 +60,19 @@ pub const DEFAULT_LOADING_ACTIONS: &[Action] =
&[Action::Commit, Action::DeltaCommit, Action::ReplaceCommit];
impl Timeline {
- #[cfg(test)]
- pub(crate) async fn new_from_completed_commits(
+ pub(crate) fn new(
hudi_configs: Arc<HudiConfigs>,
- storage_options: Arc<HashMap<String, String>>,
- completed_commits: Vec<Instant>,
- ) -> Result<Self> {
- let storage = Storage::new(storage_options.clone(),
hudi_configs.clone())?;
- Ok(Self {
+ storage: Arc<Storage>,
+ active_loader: TimelineLoader,
+ archived_loader: Option<TimelineLoader>,
+ ) -> Self {
+ Self {
hudi_configs,
storage,
- completed_commits,
- })
+ active_loader,
+ archived_loader,
+ completed_commits: Vec::new(),
+ }
}
pub(crate) async fn new_from_storage(
@@ -73,58 +80,66 @@ impl Timeline {
storage_options: Arc<HashMap<String, String>>,
) -> Result<Self> {
let storage = Storage::new(storage_options.clone(),
hudi_configs.clone())?;
+ let mut timeline = TimelineBuilder::new(hudi_configs,
storage).build().await?;
let selector = TimelineSelector::completed_actions_in_range(
DEFAULT_LOADING_ACTIONS,
- hudi_configs.clone(),
+ timeline.hudi_configs.clone(),
None,
None,
)?;
- let completed_commits = Self::load_instants(&selector, &storage,
false).await?;
- Ok(Self {
- hudi_configs,
- storage,
- completed_commits,
- })
+ timeline.completed_commits = timeline.load_instants(&selector,
false).await?;
+ Ok(timeline)
}
- async fn load_instants(
+ /// Load instants from the timeline based on the selector criteria.
+ ///
+ /// # Archived Timeline Loading
+ ///
+ /// Archived instants are loaded only when BOTH conditions are met:
+ /// 1. The selector has a time filter (start or end timestamp)
+ /// 2. `TimelineArchivedReadEnabled` config is set to `true`
+ ///
+ /// This double-gate design ensures:
+ /// - Queries without time filters only read active timeline (optimization)
+ /// - Historical time-range queries can include archived data when
explicitly enabled
+ ///
+ /// # Arguments
+ ///
+ /// * `selector` - The criteria for selecting instants (actions, states,
time range)
+ /// * `desc` - If true, return instants in descending order by timestamp
+ pub async fn load_instants(
+ &self,
selector: &TimelineSelector,
- storage: &Storage,
desc: bool,
) -> Result<Vec<Instant>> {
- let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?;
-
- // For most cases, we load completed instants, so we can pre-allocate
the vector with a
- // capacity of 1/3 of the total number of listed files,
- // ignoring requested and inflight instants.
- let mut instants = Vec::with_capacity(files.len() / 3);
-
- for file_info in files {
- match selector.try_create_instant(file_info.name.as_str()) {
- Ok(instant) => instants.push(instant),
- Err(e) => {
- // Ignore files that are not valid or desired instants.
- debug!(
- "Instant not created from file {:?} due to: {:?}",
- file_info, e
- );
+ // If a time filter is present and we have an archived loader, include
archived as well.
+ if selector.has_time_filter() {
+ let mut instants = self.active_loader.load_instants(selector,
desc).await?;
+ if let Some(archived_loader) = &self.archived_loader {
+ let mut archived = archived_loader
+ .load_archived_instants(selector, desc)
+ .await?;
+ if !archived.is_empty() {
+ // Both sides already sorted by loaders; append is fine
for now.
+ instants.append(&mut archived);
}
}
- }
-
- instants.sort_unstable();
-
- // As of current impl., we don't mutate instants once timeline is
created,
- // so we can save some memory by shrinking the capacity.
- instants.shrink_to_fit();
-
- if desc {
- Ok(instants.into_iter().rev().collect())
- } else {
Ok(instants)
+ } else {
+ self.active_loader.load_instants(selector, desc).await
}
}
+ async fn load_instants_internal(
+ &self,
+ selector: &TimelineSelector,
+ desc: bool,
+ ) -> Result<Vec<Instant>> {
+ // For now, just load active. Archived support will be added
internally later
+ // based on selector ranges.
+ self.active_loader.load_instants(selector, desc).await
+ }
+
/// Get the completed commit [Instant]s in the timeline.
///
/// * For Copy-on-write tables, this includes commit instants.
@@ -136,7 +151,7 @@ impl Timeline {
pub async fn get_completed_commits(&self, desc: bool) ->
Result<Vec<Instant>> {
let selector =
TimelineSelector::completed_commits_in_range(self.hudi_configs.clone(), None,
None)?;
- Self::load_instants(&selector, &self.storage, desc).await
+ self.load_instants_internal(&selector, desc).await
}
/// Get the completed deltacommit [Instant]s in the timeline.
@@ -152,7 +167,7 @@ impl Timeline {
None,
None,
)?;
- Self::load_instants(&selector, &self.storage, desc).await
+ self.load_instants_internal(&selector, desc).await
}
/// Get the completed replacecommit [Instant]s in the timeline.
@@ -166,7 +181,7 @@ impl Timeline {
None,
None,
)?;
- Self::load_instants(&selector, &self.storage, desc).await
+ self.load_instants_internal(&selector, desc).await
}
/// Get the completed clustering commit [Instant]s in the timeline.
@@ -180,7 +195,7 @@ impl Timeline {
None,
None,
)?;
- let instants = Self::load_instants(&selector, &self.storage,
desc).await?;
+ let instants = self.load_instants_internal(&selector, desc).await?;
let mut clustering_instants = Vec::new();
for instant in instants {
let metadata = self.get_instant_metadata(&instant).await?;
@@ -198,19 +213,14 @@ impl Timeline {
}
async fn get_instant_metadata(&self, instant: &Instant) ->
Result<Map<String, Value>> {
- let path = instant.relative_path()?;
- let bytes = self.storage.get_file_data(path.as_str()).await?;
-
- serde_json::from_slice(&bytes)
- .map_err(|e| CoreError::Timeline(format!("Failed to get commit
metadata: {}", e)))
+ self.active_loader.load_instant_metadata(instant).await
}
/// Get the instant metadata in JSON format.
pub async fn get_instant_metadata_in_json(&self, instant: &Instant) ->
Result<String> {
- let path = instant.relative_path()?;
- let bytes = self.storage.get_file_data(path.as_str()).await?;
- String::from_utf8(bytes.to_vec())
- .map_err(|e| CoreError::Timeline(format!("Failed to get commit
metadata: {}", e)))
+ self.active_loader
+ .load_instant_metadata_as_json(instant)
+ .await
}
pub(crate) async fn get_latest_commit_metadata(&self) ->
Result<Map<String, Value>> {
@@ -322,14 +332,102 @@ mod tests {
use crate::config::table::HudiTableConfig;
use crate::metadata::meta_field::MetaField;
+ use crate::timeline::instant::{Action, State};
+ #[tokio::test]
+ async fn test_timeline_v8_nonpartitioned() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+ assert_eq!(timeline.completed_commits.len(), 2);
+ assert!(timeline.active_loader.is_layout_two_active());
+ // Archived loader should be None when TimelineArchivedReadEnabled is
false (default)
+ assert!(timeline.archived_loader.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_timeline_v8_with_archived_enabled() {
+ use
crate::config::internal::HudiInternalConfig::TimelineArchivedReadEnabled;
+
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+
+ // Build initial configs with base path and archived read enabled
+ let mut options_map = HashMap::new();
+ options_map.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ base_url.to_string(),
+ );
+ options_map.insert(
+ TimelineArchivedReadEnabled.as_ref().to_string(),
+ "true".to_string(),
+ );
+
+ let storage = Storage::new(
+ Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::new(options_map.clone())),
+ )
+ .unwrap();
+
+ let table_properties = crate::config::util::parse_data_for_options(
+ &storage
+ .get_file_data(".hoodie/hoodie.properties")
+ .await
+ .unwrap(),
+ "=",
+ )
+ .unwrap();
+ options_map.extend(table_properties);
+ let hudi_configs = Arc::new(HudiConfigs::new(options_map));
+
+ let timeline = TimelineBuilder::new(hudi_configs, storage)
+ .build()
+ .await
+ .unwrap();
+
+ // When TimelineArchivedReadEnabled is true, archived loader should be
created
+ assert!(timeline.active_loader.is_layout_two_active());
+ assert!(timeline
+ .archived_loader
+ .as_ref()
+ .map(|l| l.is_layout_two_archived())
+ .unwrap_or(false));
+ }
async fn create_test_timeline(base_url: Url) -> Timeline {
- Timeline::new_from_storage(
- Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath,
base_url)])),
+ let storage = Storage::new(
Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::new([(
+ HudiTableConfig::BasePath,
+ base_url.to_string(),
+ )])),
)
- .await
- .unwrap()
+ .unwrap();
+
+ let hudi_configs = HudiConfigs::new([(HudiTableConfig::BasePath,
base_url.to_string())]);
+ let table_properties = crate::config::util::parse_data_for_options(
+ &storage
+ .get_file_data(".hoodie/hoodie.properties")
+ .await
+ .unwrap(),
+ "=",
+ )
+ .unwrap();
+ let mut hudi_configs_map = hudi_configs.as_options();
+ hudi_configs_map.extend(table_properties);
+ let hudi_configs = Arc::new(HudiConfigs::new(hudi_configs_map));
+
+ let mut timeline = TimelineBuilder::new(hudi_configs, storage)
+ .build()
+ .await
+ .unwrap();
+
+ let selector = TimelineSelector::completed_actions_in_range(
+ DEFAULT_LOADING_ACTIONS,
+ timeline.hudi_configs.clone(),
+ None,
+ None,
+ )
+ .unwrap();
+ timeline.completed_commits = timeline.load_instants(&selector,
false).await.unwrap();
+ timeline
}
#[tokio::test]
@@ -385,7 +483,12 @@ mod tests {
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, CoreError::Timeline(_)));
- assert!(err.to_string().contains("Failed to get commit metadata"));
+ // Error message changed to be more specific about JSON parsing
+ assert!(
+ err.to_string()
+ .contains("Failed to parse JSON commit metadata")
+ || err.to_string().contains("EOF while parsing")
+ );
let instant = Instant::from_str("20240402144910683.commit").unwrap();
@@ -394,7 +497,12 @@ mod tests {
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, CoreError::Timeline(_)));
- assert!(err.to_string().contains("Failed to get commit metadata"));
+ // Error message changed to be more specific about JSON parsing
+ assert!(
+ err.to_string()
+ .contains("Failed to parse JSON commit metadata")
+ || err.to_string().contains("expected value")
+ );
}
#[tokio::test]
@@ -498,4 +606,97 @@ mod tests {
);
}
}
+
+ #[tokio::test]
+ async fn test_get_completed_commits() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let commits = timeline.get_completed_commits(false).await.unwrap();
+ assert!(!commits.is_empty());
+ // All should be commits in completed state
+ for instant in &commits {
+ assert_eq!(instant.action, Action::Commit);
+ assert_eq!(instant.state, State::Completed);
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_completed_deltacommits() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let deltacommits =
timeline.get_completed_deltacommits(false).await.unwrap();
+ // All should be deltacommits (or empty if none exist)
+ for instant in &deltacommits {
+ assert_eq!(instant.action, Action::DeltaCommit);
+ assert_eq!(instant.state, State::Completed);
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_completed_replacecommits() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let replacecommits =
timeline.get_completed_replacecommits(false).await.unwrap();
+ // All should be replacecommits (or empty if none exist)
+ for instant in &replacecommits {
+ assert!(instant.action.is_replacecommit());
+ assert_eq!(instant.state, State::Completed);
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_commits_descending_order() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let commits_asc = timeline.get_completed_commits(false).await.unwrap();
+ let commits_desc = timeline.get_completed_commits(true).await.unwrap();
+
+ assert_eq!(commits_asc.len(), commits_desc.len());
+ if !commits_asc.is_empty() {
+ // Verify descending order is reverse of ascending
+ assert_eq!(commits_asc.first(), commits_desc.last());
+ assert_eq!(commits_asc.last(), commits_desc.first());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_instant_metadata_in_json() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let commits = timeline.get_completed_commits(false).await.unwrap();
+ if let Some(instant) = commits.first() {
+ let json = timeline
+ .get_instant_metadata_in_json(instant)
+ .await
+ .unwrap();
+ // Should be valid JSON
+ assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_latest_commit_timestamp() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let timestamp = timeline.get_latest_commit_timestamp().unwrap();
+ assert!(!timestamp.is_empty());
+ // Should be in timeline timestamp format
+ assert!(timestamp.len() >= 14);
+ }
+
+ #[tokio::test]
+ async fn test_get_latest_commit_timestamp_as_option() {
+ let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+ let timeline = create_test_timeline(base_url).await;
+
+ let timestamp = timeline.get_latest_commit_timestamp_as_option();
+ assert!(timestamp.is_some());
+ assert!(!timestamp.unwrap().is_empty());
+ }
}
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index 8dca297..cb98378 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -142,6 +142,10 @@ pub struct TimelineSelector {
states: Vec<State>,
actions: Vec<Action>,
include_archived: bool,
+ /// Timeline layout version determines instant format validation:
+ /// - Layout 1 (pre-v8): expects `{timestamp}.{action}` for completed
instants
+ /// - Layout 2 (v8+): expects
`{requestedTimestamp}_{completedTimestamp}.{action}` for completed instants
+ timeline_layout_version: isize,
}
#[allow(dead_code)]
@@ -152,6 +156,25 @@ impl TimelineSelector {
.into()
}
+ fn get_timeline_layout_version_from_configs(hudi_configs: &HudiConfigs) ->
isize {
+ // Try to get layout version from config, otherwise infer from table
version
+ if let Some(layout_version) =
hudi_configs.try_get(HudiTableConfig::TimelineLayoutVersion) {
+ layout_version.into()
+ } else {
+ // Apply same default logic as TimelineBuilder:
+ // v8+ tables default to layout 2, earlier versions default to
layout 1
+ let table_version: isize = hudi_configs
+ .try_get(HudiTableConfig::TableVersion)
+ .map(|v| v.into())
+ .unwrap_or(6); // Conservative default if table version is
somehow missing
+ if table_version >= 8 {
+ 2
+ } else {
+ 1
+ }
+ }
+ }
+
fn parse_datetime(timezone: &str, timestamp: Option<&str>) ->
Result<Option<DateTime<Utc>>> {
timestamp
.map(|e| Instant::parse_datetime(e, timezone))
@@ -165,6 +188,7 @@ impl TimelineSelector {
end: Option<&str>,
) -> Result<Self> {
let timezone = Self::get_timezone_from_configs(&hudi_configs);
+ let timeline_layout_version =
Self::get_timeline_layout_version_from_configs(&hudi_configs);
let start_datetime = Self::parse_datetime(&timezone, start)?;
let end_datetime = Self::parse_datetime(&timezone, end)?;
Ok(Self {
@@ -174,6 +198,7 @@ impl TimelineSelector {
states: vec![State::Completed],
actions: actions.to_vec(),
include_archived: false,
+ timeline_layout_version,
})
}
@@ -201,6 +226,11 @@ impl TimelineSelector {
Self::completed_actions_in_range(&[Action::ReplaceCommit],
hudi_configs, start, end)
}
+ /// Whether the selector has any time filter (start or end) applied.
+ pub fn has_time_filter(&self) -> bool {
+ self.start_datetime.is_some() || self.end_datetime.is_some()
+ }
+
pub fn should_include_action(&self, action: &Action) -> bool {
self.actions.is_empty() || self.actions.contains(action)
}
@@ -210,7 +240,7 @@ impl TimelineSelector {
}
pub fn try_create_instant(&self, file_name: &str) -> Result<Instant> {
- let (timestamp, action_suffix) =
file_name.split_once('.').ok_or_else(|| {
+ let (timestamp_part, action_suffix) =
file_name.split_once('.').ok_or_else(|| {
CoreError::Timeline(format!(
"Instant not created due to invalid file name: {file_name}"
))
@@ -230,6 +260,40 @@ impl TimelineSelector {
)));
}
+ // Handle v8+ completed instant format:
{requestedTimestamp}_{completedTimestamp}.{action}
+ // Validate format based on timeline layout version and instant state
+ let (timestamp, completed_timestamp) = if let Some((requested_ts,
completed_ts)) =
+ timestamp_part.split_once('_')
+ {
+ // Found underscore format - this should be a v8+ (layout 2)
completed instant
+ if self.timeline_layout_version == 1 && state == State::Completed {
+ return Err(CoreError::Timeline(format!(
+ "Unexpected v8+ instant format in timeline layout v1:
{file_name}"
+ )));
+ }
+
+ // Validate both timestamps
+ if requested_ts.len() != 17 && requested_ts.len() != 14 {
+ return Err(CoreError::Timeline(format!(
+ "Invalid requested timestamp in v8+ format: {file_name}"
+ )));
+ }
+ if completed_ts.len() != 17 && completed_ts.len() != 14 {
+ return Err(CoreError::Timeline(format!(
+ "Invalid completed timestamp in v8+ format: {file_name}"
+ )));
+ }
+ (requested_ts, Some(completed_ts.to_string()))
+ } else {
+ // No underscore format - this should be a pre-v8 instant OR a
non-completed v8+ instant
+ if self.timeline_layout_version == 2 && state == State::Completed {
+ return Err(CoreError::Timeline(format!(
+ "Expected v8+ instant format (with completed timestamp) in
timeline layout v2 for completed instant: {file_name}"
+ )));
+ }
+ (timestamp_part, None)
+ };
+
let dt = Instant::parse_datetime(timestamp, &self.timezone)?;
if let Some(start) = self.start_datetime {
if dt < start {
@@ -251,6 +315,7 @@ impl TimelineSelector {
Ok(Instant {
timestamp: timestamp.to_string(),
+ completed_timestamp,
epoch_millis: dt.timestamp_millis(),
action,
state,
@@ -306,6 +371,11 @@ mod tests {
use super::*;
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
+ use crate::storage::Storage;
+ use crate::timeline::builder::TimelineBuilder;
+ use crate::timeline::instant::{Action, Instant, State};
+ use crate::timeline::Timeline;
+ use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
@@ -470,6 +540,7 @@ mod tests {
states: states.to_vec(),
actions: actions.to_vec(),
include_archived: false,
+ timeline_layout_version: 1, // Default to layout v1 for tests
}
}
@@ -523,7 +594,25 @@ mod tests {
}
async fn create_test_timeline() -> Timeline {
- let instants = vec![
+ let storage = Storage::new(
+ Arc::new(HashMap::new()),
+ Arc::new(HudiConfigs::new([
+ (HudiTableConfig::BasePath, "file:///tmp/base"),
+ (HudiTableConfig::TableVersion, "6"),
+ ])),
+ )
+ .unwrap();
+ let mut timeline = TimelineBuilder::new(
+ Arc::new(HudiConfigs::new([
+ (HudiTableConfig::BasePath, "file:///tmp/base"),
+ (HudiTableConfig::TableVersion, "6"),
+ ])),
+ storage,
+ )
+ .build()
+ .await
+ .unwrap();
+ timeline.completed_commits = vec![
Instant::from_str("20240103153000.commit").unwrap(),
Instant::from_str("20240103153010999.commit").unwrap(),
Instant::from_str("20240103153020999.commit.requested").unwrap(),
@@ -531,16 +620,7 @@ mod tests {
Instant::from_str("20240103153020999.commit").unwrap(),
Instant::from_str("20240103153030999.commit").unwrap(),
];
- Timeline::new_from_completed_commits(
- Arc::new(HudiConfigs::new([(
- HudiTableConfig::BasePath,
- "file:///tmp/base",
- )])),
- Arc::new(HashMap::new()),
- instants,
- )
- .await
- .unwrap()
+ timeline
}
#[tokio::test]
@@ -555,6 +635,7 @@ mod tests {
end_datetime: None,
timezone: "UTC".to_string(),
include_archived: false,
+ timeline_layout_version: 1,
};
assert!(selector.select(&timeline).unwrap().is_empty());
}
@@ -570,9 +651,76 @@ mod tests {
end_datetime: end.map(|s| Instant::parse_datetime(s,
"UTC").unwrap()),
timezone: "UTC".to_string(),
include_archived: false,
+ timeline_layout_version: 1,
}
}
+ #[test]
+ fn test_layout_version_validation() {
+ // Test layout v1 - should reject v8+ format for completed instants
+ let selector_v1 = TimelineSelector {
+ timezone: "UTC".to_string(),
+ start_datetime: None,
+ end_datetime: None,
+ states: vec![State::Completed],
+ actions: vec![Action::DeltaCommit],
+ include_archived: false,
+ timeline_layout_version: 1,
+ };
+
+ // v8+ format should be rejected for layout v1
+ let result =
selector_v1.try_create_instant("20240103153000_20240103153001.deltacommit");
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Unexpected v8+ instant format in timeline layout v1"));
+
+ // pre-v8 format should work for layout v1
+ assert!(selector_v1
+ .try_create_instant("20240103153000.deltacommit")
+ .is_ok());
+
+ // Test layout v2 - should reject pre-v8 format for completed instants
+ let selector_v2 = TimelineSelector {
+ timezone: "UTC".to_string(),
+ start_datetime: None,
+ end_datetime: None,
+ states: vec![State::Completed],
+ actions: vec![Action::DeltaCommit],
+ include_archived: false,
+ timeline_layout_version: 2,
+ };
+
+ // pre-v8 format should be rejected for layout v2 completed instants
+ let result =
selector_v2.try_create_instant("20240103153000.deltacommit");
+ assert!(result.is_err());
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Expected v8+ instant format"));
+
+ // v8+ format should work for layout v2
+ assert!(selector_v2
+ .try_create_instant("20240103153000_20240103153001.deltacommit")
+ .is_ok());
+
+ // Non-completed instants (inflight, requested) should work with
standard format in both layouts
+ let selector_v2_inflight = TimelineSelector {
+ timezone: "UTC".to_string(),
+ start_datetime: None,
+ end_datetime: None,
+ states: vec![State::Inflight],
+ actions: vec![Action::DeltaCommit],
+ include_archived: false,
+ timeline_layout_version: 2,
+ };
+
+ assert!(selector_v2_inflight
+ .try_create_instant("20240103153000.deltacommit.inflight")
+ .is_ok());
+ }
+
#[tokio::test]
async fn test_timestamp_filtering() -> Result<()> {
let timeline = create_test_timeline().await;
diff --git a/crates/core/tests/data/commit_metadata/v6_commit.json
b/crates/core/tests/data/commit_metadata/v6_commit.json
new file mode 100644
index 0000000..511619d
--- /dev/null
+++ b/crates/core/tests/data/commit_metadata/v6_commit.json
@@ -0,0 +1,38 @@
+{
+ "partitionToWriteStats" : {
+ "" : [ {
+ "fileId" : "a079bdb3-731c-4894-b855-abfcd6921007-0",
+ "path" :
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+ "cdcStats" : null,
+ "prevCommit" : "20240418173550988",
+ "numWrites" : 4,
+ "numDeletes" : 0,
+ "numUpdateWrites" : 1,
+ "numInserts" : 1,
+ "totalWriteBytes" : 441520,
+ "totalWriteErrors" : 0,
+ "tempPath" : null,
+ "partitionPath" : "",
+ "totalLogRecords" : 0,
+ "totalLogFilesCompacted" : 0,
+ "totalLogSizeCompacted" : 0,
+ "totalUpdatedRecordsCompacted" : 0,
+ "totalLogBlocks" : 0,
+ "totalCorruptLogBlock" : 0,
+ "totalRollbackBlocks" : 0,
+ "fileSizeInBytes" : 441520,
+ "minEventTime" : null,
+ "maxEventTime" : null,
+ "runtimeStats" : {
+ "totalScanTime" : 0,
+ "totalUpsertTime" : 105,
+ "totalCreateTime" : 0
+ }
+ } ]
+ },
+ "compacted" : false,
+ "extraMetadata" : {
+ "schema" :
"{\"type\":\"record\",\"name\":\"v6_nonpartitioned_record\",\"namespace\":\"hoodie.v6_nonpartitioned\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"isActive\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"byteField\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"shortField\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"intFi
[...]
+ },
+ "operationType" : "UPSERT"
+}
\ No newline at end of file
diff --git a/crates/core/tests/data/commit_metadata/v8_deltacommit.avro
b/crates/core/tests/data/commit_metadata/v8_deltacommit.avro
new file mode 100644
index 0000000..d8000be
Binary files /dev/null and
b/crates/core/tests/data/commit_metadata/v8_deltacommit.avro differ
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties
new file mode 100644
index 0000000..dd1798c
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.name=commits_load_schema_from_base_file_cow
+hoodie.table.type=COPY_ON_WRITE
+hoodie.table.version=6
+hoodie.table.timeline.layout.version=1
diff --git
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties
new file mode 100644
index 0000000..51853f9
--- /dev/null
+++
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.name=commits_load_schema_from_base_file_mor
+hoodie.table.type=MERGE_ON_READ
+hoodie.table.version=6
+hoodie.table.timeline.layout.version=1
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 9ee6094..662548b 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -54,26 +54,29 @@ use hudi_core::table::Table as HudiTable;
///
/// # Examples
///
-/// ```rust
+/// ```rust,no_run
/// use std::sync::Arc;
///
/// use datafusion::error::Result;
/// use datafusion::prelude::{DataFrame, SessionContext};
/// use hudi_datafusion::HudiDataSource;
///
-/// // Initialize a new DataFusion session context
-/// let ctx = SessionContext::new();
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+/// // Initialize a new DataFusion session context
+/// let ctx = SessionContext::new();
///
-/// // Create a new HudiDataSource with specific read options
-/// let hudi = HudiDataSource::new_with_options(
-/// "/tmp/trips_table",
-/// [("hoodie.read.input.partitions", 5)]).await?;
-///
-/// // Register the Hudi table with the session context
-/// ctx.register_table("trips_table", Arc::new(hudi))?;
-/// let df: DataFrame = ctx.sql("SELECT * from trips_table where city =
'san_francisco'").await?;
-/// df.show().await?;
+/// // Create a new HudiDataSource with specific read options
+/// let hudi = HudiDataSource::new_with_options(
+/// "/tmp/trips_table",
+/// [("hoodie.read.input.partitions", "5")]).await?;
///
+/// // Register the Hudi table with the session context
+/// ctx.register_table("trips_table", Arc::new(hudi))?;
+/// let df: DataFrame = ctx.sql("SELECT * from trips_table where city =
'san_francisco'").await?;
+/// df.show().await?;
+/// Ok(())
+/// }
/// ```
#[derive(Clone, Debug)]
pub struct HudiDataSource {
@@ -254,30 +257,27 @@ impl TableProvider for HudiDataSource {
///
/// Creating a new `HudiTableFactory` instance:
///
-/// ```rust
+/// ```rust,no_run
/// use datafusion::prelude::SessionContext;
+/// use datafusion::catalog::TableProviderFactory;
+/// use datafusion::sql::parser::CreateExternalTable;
/// use hudi_datafusion::HudiTableFactory;
///
-/// // Initialize a new HudiTableFactory
-/// let factory = HudiTableFactory::new();
+/// #[tokio::main]
+/// async fn main() -> datafusion::error::Result<()> {
+/// // Initialize a new HudiTableFactory
+/// let factory = HudiTableFactory::new();
///
-/// // The factory can be used to create Hudi tables
-/// let table = factory.create_table(...)?;
-///
-///
-/// // Using `CREATE EXTERNAL TABLE` to register Hudi table:
-/// let test_table = factory.create_table(...)?; // Table with path + url
-/// let ctx = SessionContext::new();
-///
-/// // Register table in session using `CREATE EXTERNAL TABLE` command
-/// let create_table_sql = format!(
-/// "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
-/// test_table.as_ref(),
-/// test_table.path(),
-/// concat_as_sql_options(options)
-/// );
-/// ctx.sql(create_table_sql.as_str()).await?; // Query against this table
-///
+/// // Initialize a new DataFusion session context
+/// let ctx = SessionContext::new();
+///
+/// // Register table using SQL command
+/// let create_table_sql =
+/// "CREATE EXTERNAL TABLE trips_table STORED AS HUDI LOCATION
'/tmp/trips_table'";
+/// ctx.sql(create_table_sql).await?;
+///
+/// Ok(())
+/// }
/// ```
#[derive(Debug)]
pub struct HudiTableFactory {}