This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new aaade0f  feat: add table v8+ timeline support for loading old and new 
table versions (#395)
aaade0f is described below

commit aaade0fde42606aa2b2bcf668dc07c13d7e9b835
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Dec 7 10:30:12 2025 +0530

    feat: add table v8+ timeline support for loading old and new table versions 
(#395)
    
    
    
    ---------
    
    Signed-off-by: Sagar Sumit <[email protected]>
    Co-authored-by: Shiyan Xu <[email protected]>
---
 crates/core/Cargo.toml                             |   3 +
 crates/core/src/config/internal.rs                 |  21 +-
 crates/core/src/config/read.rs                     |   2 +-
 crates/core/src/config/table.rs                    |  20 +
 crates/core/src/file_group/reader.rs               |   8 +-
 crates/core/src/lib.rs                             |   6 +-
 crates/core/src/metadata/commit.rs                 | 230 +++++++++-
 crates/core/src/timeline/builder.rs                | 344 +++++++++++++++
 crates/core/src/timeline/instant.rs                | 181 +++++++-
 crates/core/src/timeline/loader.rs                 | 466 +++++++++++++++++++++
 crates/core/src/timeline/lsm_tree.rs               | 109 +++++
 crates/core/src/timeline/mod.rs                    | 333 ++++++++++++---
 crates/core/src/timeline/selector.rs               | 172 +++++++-
 .../core/tests/data/commit_metadata/v6_commit.json |  38 ++
 .../tests/data/commit_metadata/v8_deltacommit.avro | Bin 0 -> 4483 bytes
 .../.hoodie/hoodie.properties                      |  23 +
 .../.hoodie/hoodie.properties                      |  23 +
 crates/datafusion/src/lib.rs                       |  64 +--
 18 files changed, 1909 insertions(+), 134 deletions(-)

diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index e8bde12..eb958a9 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -27,6 +27,9 @@ description.workspace = true
 homepage.workspace = true
 repository.workspace = true
 
+[lints.rust]
+unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] }
+
 [dependencies]
 # arrow
 arrow = { workspace = true }
diff --git a/crates/core/src/config/internal.rs 
b/crates/core/src/config/internal.rs
index 61f74c6..45db8b4 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -37,18 +37,31 @@ use crate::config::{ConfigParser, HudiConfigValue};
 /// use hudi_core::table::Table as HudiTable;
 ///
 /// let options = [(SkipConfigValidation, "true")];
-/// HudiTable::new_with_options("/tmp/hudi_data", options)
+/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
 /// ```
 ///
 #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiInternalConfig {
     SkipConfigValidation,
+    /// Enable reading archived timeline (v1) and LSM history (v2).
+    ///
+    /// When enabled, timeline queries with time range filters will include 
archived instants
+    /// in addition to active instants. When disabled (default), only active 
timeline is read.
+    ///
+    /// Note: Archived instants are only loaded when BOTH conditions are met:
+    /// 1. This config is set to `true`
+    /// 2. The query specifies a time range filter (start or end timestamp)
+    ///
+    /// Queries without time filters (e.g., `get_completed_commits()`) will 
never load
+    /// archived instants, regardless of this setting.
+    TimelineArchivedReadEnabled,
 }
 
 impl AsRef<str> for HudiInternalConfig {
     fn as_ref(&self) -> &str {
         match self {
             Self::SkipConfigValidation => 
"hoodie.internal.skip.config.validation",
+            Self::TimelineArchivedReadEnabled => 
"hoodie.internal.timeline.archived.enabled",
         }
     }
 }
@@ -65,6 +78,7 @@ impl ConfigParser for HudiInternalConfig {
     fn default_value(&self) -> Option<HudiConfigValue> {
         match self {
             Self::SkipConfigValidation => 
Some(HudiConfigValue::Boolean(false)),
+            Self::TimelineArchivedReadEnabled => 
Some(HudiConfigValue::Boolean(false)),
         }
     }
 
@@ -80,6 +94,11 @@ impl ConfigParser for HudiInternalConfig {
                     bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::Boolean),
+            Self::TimelineArchivedReadEnabled => get_result
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
+                })
+                .map(HudiConfigValue::Boolean),
         }
     }
 }
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index dae0a44..4aad618 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -37,7 +37,7 @@ use crate::config::{ConfigParser, HudiConfigValue};
 /// use hudi_core::table::Table as HudiTable;
 ///
 /// let options = [(InputPartitions, "2")];
-/// HudiTable::new_with_options("/tmp/hudi_data", options)
+/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
 /// ```
 ///
 
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 06f3dec..e3fd273 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -117,6 +117,17 @@ pub enum HudiTableConfig {
     ///
     /// - [`TimelineTimezoneValue`] - Possible values for this configuration.
     TimelineTimezone,
+
+    /// Folder for archived timeline files for layout v1 (default: 
.hoodie/archived)
+    ArchiveLogFolder,
+
+    /// Path for timeline directory for layout v2 (v8+), relative to .hoodie/ 
(default: timeline)
+    /// The full path will be `.hoodie/{TimelinePath}`
+    TimelinePath,
+
+    /// Path for LSM timeline history for layout v2, relative to timeline path 
(default: history)
+    /// The full path will be `.hoodie/{TimelinePath}/{TimelineHistoryPath}`
+    TimelineHistoryPath,
 }
 
 impl AsRef<str> for HudiTableConfig {
@@ -141,6 +152,9 @@ impl AsRef<str> for HudiTableConfig {
             Self::TableVersion => "hoodie.table.version",
             Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
             Self::TimelineTimezone => "hoodie.table.timeline.timezone",
+            Self::ArchiveLogFolder => "hoodie.archivelog.folder",
+            Self::TimelinePath => "hoodie.timeline.path",
+            Self::TimelineHistoryPath => "hoodie.timeline.history.path",
         }
     }
 }
@@ -166,6 +180,9 @@ impl ConfigParser for HudiTableConfig {
             Self::TimelineTimezone => Some(HudiConfigValue::String(
                 TimelineTimezoneValue::UTC.as_ref().to_string(),
             )),
+            Self::ArchiveLogFolder => 
Some(HudiConfigValue::String(".hoodie/archived".to_string())),
+            Self::TimelinePath => 
Some(HudiConfigValue::String("timeline".to_string())),
+            Self::TimelineHistoryPath => 
Some(HudiConfigValue::String("history".to_string())),
             _ => None,
         }
     }
@@ -238,6 +255,9 @@ impl ConfigParser for HudiTableConfig {
             Self::TimelineTimezone => get_result
                 .and_then(TimelineTimezoneValue::from_str)
                 .map(|v| HudiConfigValue::String(v.as_ref().to_string())),
+            Self::ArchiveLogFolder => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::TimelinePath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::TimelineHistoryPath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
         }
     }
 
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 53abbfb..ae717fb 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -75,8 +75,8 @@ impl FileGroupReader {
     /// Creates a new reader with the given base URI and options.
     ///
     /// # Arguments
-    ///     * `base_uri` - The base URI of the file group's residing table.
-    ///     * `options` - Additional options for the reader.
+    /// * `base_uri` - The base URI of the file group's residing table.
+    /// * `options` - Additional options for the reader.
     ///
     /// # Notes
     /// This API uses [`OptionResolver`] that loads table properties from 
storage to resolve options.
@@ -164,7 +164,7 @@ impl FileGroupReader {
     /// Reads the data from the base file at the given relative path.
     ///
     /// # Arguments
-    ///     * `relative_path` - The relative path to the base file.
+    /// * `relative_path` - The relative path to the base file.
     ///
     /// # Returns
     /// A record batch read from the base file.
@@ -216,7 +216,7 @@ impl FileGroupReader {
     /// Reads the data from the given file slice.
     ///
     /// # Arguments
-    ///     * `file_slice` - The file slice to read.
+    /// * `file_slice` - The file slice to read.
     ///
     /// # Returns
     /// A record batch read from the file slice.
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 3ba206a..9438db5 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -23,11 +23,11 @@
 //! **Example**
 //!
 //! ```rust
-//! use hudi_core::config::read::HudiReadConfig::{AsOfTimestamp, 
InputPartitions};
+//! use hudi_core::config::read::HudiReadConfig::InputPartitions;
 //! use hudi_core::table::Table as HudiTable;
 //!
-//! let options = [(InputPartitions, "2"), (AsOfTimestamp, 
"20240101010100000")];
-//! HudiTable::new_with_options("/tmp/hudi_data", options);
+//! let options = [(InputPartitions, "2")];
+//! HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
 //! ```
 //!
 //! # The [table] module is responsible for managing Hudi tables.
diff --git a/crates/core/src/metadata/commit.rs 
b/crates/core/src/metadata/commit.rs
index 579874b..6d898e1 100644
--- a/crates/core/src/metadata/commit.rs
+++ b/crates/core/src/metadata/commit.rs
@@ -19,17 +19,23 @@
 
 use crate::error::CoreError;
 use crate::Result;
+use apache_avro::from_value;
+use apache_avro::Reader as AvroReader;
 use apache_avro_derive::AvroSchema as DeriveAvroSchema;
 use serde::{Deserialize, Serialize};
 use serde_json::{Map, Value};
 use std::collections::HashMap;
+use std::io::Cursor;
 
 /// Represents statistics for a single file write operation in a commit
 ///
 /// This struct is automatically derived to/from Avro schema using 
apache-avro-derive.
 /// The Avro schema can be accessed via `HoodieWriteStat::get_schema()`.
-#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
-#[serde(rename_all = "camelCase")]
+///
+/// Note: For v8+ tables with Avro format, additional fields may be present in 
the data
+/// that are not captured here. Use `#[serde(default)]` to handle missing 
fields gracefully.
+#[derive(Debug, Clone, Default, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase", default)]
 #[avro(namespace = "org.apache.hudi.avro.model")]
 pub struct HoodieWriteStat {
     #[avro(rename = "fileId")]
@@ -53,6 +59,43 @@ pub struct HoodieWriteStat {
     pub total_write_bytes: Option<i64>,
     #[avro(rename = "totalWriteErrors")]
     pub total_write_errors: Option<i64>,
+    // Additional fields from v8+ Avro schema
+    #[avro(rename = "partitionPath")]
+    pub partition_path: Option<String>,
+    #[avro(rename = "totalLogRecords")]
+    pub total_log_records: Option<i64>,
+    #[avro(rename = "totalLogFiles")]
+    pub total_log_files: Option<i64>,
+    #[avro(rename = "totalUpdatedRecordsCompacted")]
+    pub total_updated_records_compacted: Option<i64>,
+    #[avro(rename = "totalLogBlocks")]
+    pub total_log_blocks: Option<i64>,
+    #[avro(rename = "totalCorruptLogBlock")]
+    pub total_corrupt_log_block: Option<i64>,
+    #[avro(rename = "totalRollbackBlocks")]
+    pub total_rollback_blocks: Option<i64>,
+    #[avro(rename = "fileSizeInBytes")]
+    pub file_size_in_bytes: Option<i64>,
+    #[avro(rename = "logVersion")]
+    pub log_version: Option<i32>,
+    #[avro(rename = "logOffset")]
+    pub log_offset: Option<i64>,
+    #[avro(rename = "prevBaseFile")]
+    pub prev_base_file: Option<String>,
+    #[avro(rename = "minEventTime")]
+    pub min_event_time: Option<i64>,
+    #[avro(rename = "maxEventTime")]
+    pub max_event_time: Option<i64>,
+    #[avro(rename = "totalLogFilesCompacted")]
+    pub total_log_files_compacted: Option<i64>,
+    #[avro(rename = "totalLogReadTimeMs")]
+    pub total_log_read_time_ms: Option<i64>,
+    #[avro(rename = "totalLogSizeCompacted")]
+    pub total_log_size_compacted: Option<i64>,
+    #[avro(rename = "tempPath")]
+    pub temp_path: Option<String>,
+    #[avro(rename = "numUpdates")]
+    pub num_updates: Option<i64>,
 }
 
 /// Represents the metadata for a Hudi commit
@@ -63,13 +106,14 @@ pub struct HoodieWriteStat {
 /// # Example
 /// ```
 /// use hudi_core::metadata::commit::HoodieCommitMetadata;
+/// use apache_avro::schema::AvroSchema;
 ///
 /// // Get the Avro schema
 /// let schema = HoodieCommitMetadata::get_schema();
 /// println!("Schema: {}", schema.canonical_form());
 /// ```
-#[derive(Debug, Clone, Serialize, Deserialize, DeriveAvroSchema)]
-#[serde(rename_all = "camelCase")]
+#[derive(Debug, Clone, Default, Serialize, Deserialize, DeriveAvroSchema)]
+#[serde(rename_all = "camelCase", default)]
 #[avro(namespace = "org.apache.hudi.avro.model")]
 pub struct HoodieCommitMetadata {
     pub version: Option<i32>,
@@ -99,6 +143,44 @@ impl HoodieCommitMetadata {
         })
     }
 
+    /// Parse commit metadata from Avro bytes (v8+ format)
+    ///
+    /// The Avro data should be in Avro Object Container format with an 
embedded schema.
+    /// This format is used by table version 8 and later for 
commit/deltacommit instants.
+    pub fn from_avro_bytes(bytes: &[u8]) -> Result<Self> {
+        let cursor = Cursor::new(bytes);
+        let reader = AvroReader::new(cursor).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to create Avro reader: 
{}", e))
+        })?;
+
+        // The commit metadata file should contain exactly one record
+        let mut records = reader;
+        let value = records
+            .next()
+            .ok_or_else(|| CoreError::CommitMetadata("Avro file contains no 
records".to_string()))?
+            .map_err(|e| CoreError::CommitMetadata(format!("Failed to read 
Avro record: {}", e)))?;
+
+        from_value::<Self>(&value).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to deserialize Avro 
value: {}", e))
+        })
+    }
+
+    /// Convert commit metadata to a JSON Map for compatibility with existing 
code
+    ///
+    /// This is useful when the metadata is read from Avro format but needs to 
be
+    /// processed by code that expects a serde_json Map.
+    pub fn to_json_map(&self) -> Result<Map<String, Value>> {
+        let value = serde_json::to_value(self).map_err(|e| {
+            CoreError::CommitMetadata(format!("Failed to convert to JSON 
value: {}", e))
+        })?;
+        match value {
+            Value::Object(map) => Ok(map),
+            _ => Err(CoreError::CommitMetadata(
+                "Expected JSON object".to_string(),
+            )),
+        }
+    }
+
     /// Get the write stats for a specific partition
     pub fn get_partition_write_stats(&self, partition: &str) -> 
Option<&Vec<HoodieWriteStat>> {
         self.partition_to_write_stats
@@ -432,4 +514,144 @@ mod tests {
         let count = metadata.iter_replace_file_ids().count();
         assert_eq!(count, 0);
     }
+
+    #[test]
+    fn test_parse_v6_commit_json() {
+        // Test parsing v6 COW table commit metadata (JSON format)
+        let file_path = concat!(
+            env!("CARGO_MANIFEST_DIR"),
+            "/tests/data/commit_metadata/v6_commit.json"
+        );
+        let bytes = std::fs::read(file_path).expect("Failed to read test 
fixture");
+
+        let metadata = HoodieCommitMetadata::from_json_bytes(&bytes)
+            .expect("Failed to parse v6 commit metadata");
+
+        // Validate important fields
+        assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+        assert_eq!(metadata.compacted, Some(false));
+
+        // Validate partition write stats
+        let write_stats = metadata
+            .get_partition_write_stats("")
+            .expect("Should have write stats for empty partition");
+        assert_eq!(write_stats.len(), 1);
+
+        let stat = &write_stats[0];
+        assert_eq!(
+            stat.file_id,
+            Some("a079bdb3-731c-4894-b855-abfcd6921007-0".to_string())
+        );
+        assert_eq!(stat.num_writes, Some(4));
+        assert_eq!(stat.num_inserts, Some(1));
+        assert_eq!(stat.num_deletes, Some(0));
+        assert_eq!(stat.num_update_writes, Some(1));
+        assert_eq!(stat.total_write_bytes, Some(441520));
+        assert_eq!(stat.prev_commit, Some("20240418173550988".to_string()));
+    }
+
+    #[test]
+    fn test_parse_v8_deltacommit_avro() {
+        // Test parsing v8 MOR table deltacommit metadata (Avro format)
+        let file_path = concat!(
+            env!("CARGO_MANIFEST_DIR"),
+            "/tests/data/commit_metadata/v8_deltacommit.avro"
+        );
+        let bytes = std::fs::read(file_path).expect("Failed to read test 
fixture");
+
+        let metadata = HoodieCommitMetadata::from_avro_bytes(&bytes)
+            .expect("Failed to parse v8 deltacommit metadata");
+
+        // Validate important fields
+        assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+
+        // Validate partition write stats - v8 MOR table should have stats
+        assert!(metadata.partition_to_write_stats.is_some());
+        let partition_map = 
metadata.partition_to_write_stats.as_ref().unwrap();
+
+        // Should have write stats for multiple partitions
+        assert!(
+            !partition_map.is_empty(),
+            "Should have write stats for at least one partition"
+        );
+
+        // Validate at least one partition has valid stats
+        for (partition, stats) in partition_map {
+            if !stats.is_empty() {
+                let stat = &stats[0];
+                // file_id should be present
+                assert!(
+                    stat.file_id.is_some(),
+                    "file_id should be present in partition {}",
+                    partition
+                );
+                // For UPSERT operation, num_inserts or num_update_writes 
should be present
+                if let Some(num_inserts) = stat.num_inserts {
+                    assert!(num_inserts >= 0, "num_inserts should be >= 0");
+                }
+                if let Some(num_updates) = stat.num_update_writes {
+                    assert!(num_updates >= 0, "num_update_writes should be >= 
0");
+                }
+                // Verify partition_path field exists (v8+ specific field)
+                assert!(
+                    stat.partition_path.is_some(),
+                    "partition_path should be present in v8+ format"
+                );
+                break;
+            }
+        }
+    }
+
+    #[test]
+    fn test_to_json_map() {
+        let metadata = HoodieCommitMetadata {
+            version: Some(1),
+            operation_type: Some("INSERT".to_string()),
+            ..Default::default()
+        };
+
+        let json_map = metadata.to_json_map().unwrap();
+        assert!(json_map.contains_key("version"));
+        assert!(json_map.contains_key("operationType"));
+    }
+
+    #[test]
+    fn test_from_json_map() {
+        let mut map = Map::new();
+        map.insert("version".to_string(), json!(2));
+        map.insert("operationType".to_string(), json!("UPSERT"));
+
+        let metadata = HoodieCommitMetadata::from_json_map(&map).unwrap();
+        assert_eq!(metadata.version, Some(2));
+        assert_eq!(metadata.operation_type, Some("UPSERT".to_string()));
+    }
+
+    #[test]
+    fn test_get_partitions_with_replacements_sorting() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "p1": ["file1"],
+                "p2": ["file2", "file3"]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let mut partitions = metadata.get_partitions_with_replacements();
+        partitions.sort();
+        assert_eq!(partitions, vec!["p1".to_string(), "p2".to_string()]);
+    }
+
+    #[test]
+    fn test_iter_replace_file_ids_multiple_partitions() {
+        let json = json!({
+            "partitionToReplaceFileIds": {
+                "p1": ["file1"],
+                "p2": ["file2", "file3"]
+            }
+        });
+
+        let metadata: HoodieCommitMetadata = 
serde_json::from_value(json).unwrap();
+        let count = metadata.iter_replace_file_ids().count();
+        assert_eq!(count, 3); // 1 from p1, 2 from p2
+    }
 }
diff --git a/crates/core/src/timeline/builder.rs 
b/crates/core/src/timeline/builder.rs
new file mode 100644
index 0000000..43a55e0
--- /dev/null
+++ b/crates/core/src/timeline/builder.rs
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::internal::HudiInternalConfig::TimelineArchivedReadEnabled;
+use crate::config::table::HudiTableConfig::{TableVersion, 
TimelineLayoutVersion};
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::storage::Storage;
+use crate::timeline::loader::TimelineLoader;
+use crate::timeline::Timeline;
+use crate::Result;
+use std::sync::Arc;
+
+pub struct TimelineBuilder {
+    hudi_configs: Arc<HudiConfigs>,
+    storage: Arc<Storage>,
+}
+
+impl TimelineBuilder {
+    pub fn new(hudi_configs: Arc<HudiConfigs>, storage: Arc<Storage>) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+        }
+    }
+
+    pub async fn build(self) -> Result<Timeline> {
+        let (active_loader, archived_loader) = self.resolve_loader_config()?;
+        let timeline = Timeline::new(
+            self.hudi_configs,
+            self.storage,
+            active_loader,
+            archived_loader,
+        );
+        Ok(timeline)
+    }
+
+    fn resolve_loader_config(&self) -> Result<(TimelineLoader, 
Option<TimelineLoader>)> {
+        // Check if archived timeline reading is enabled
+        let archived_enabled: bool = self
+            .hudi_configs
+            .get_or_default(TimelineArchivedReadEnabled)
+            .into();
+
+        let table_version: isize = match self.hudi_configs.get(TableVersion) {
+            Ok(v) => v.into(),
+            Err(_) => {
+                let archived_loader = if archived_enabled {
+                    Some(TimelineLoader::new_layout_one_archived(
+                        self.hudi_configs.clone(),
+                        self.storage.clone(),
+                    ))
+                } else {
+                    None
+                };
+                return Ok((
+                    TimelineLoader::new_layout_one_active(
+                        self.hudi_configs.clone(),
+                        self.storage.clone(),
+                    ),
+                    archived_loader,
+                ));
+            }
+        };
+
+        let layout_version: isize = self
+            .hudi_configs
+            .try_get(TimelineLayoutVersion)
+            .map(|v| v.into())
+            .unwrap_or_else(|| if table_version == 8 { 2isize } else { 1isize 
});
+
+        match layout_version {
+            1 => {
+                if !(6..=8).contains(&table_version) {
+                    return Err(CoreError::Unsupported(format!(
+                        "Unsupported table version {} with timeline layout 
version {}",
+                        table_version, layout_version
+                    )));
+                }
+                let archived_loader = if archived_enabled {
+                    Some(TimelineLoader::new_layout_one_archived(
+                        self.hudi_configs.clone(),
+                        self.storage.clone(),
+                    ))
+                } else {
+                    None
+                };
+                Ok((
+                    TimelineLoader::new_layout_one_active(
+                        self.hudi_configs.clone(),
+                        self.storage.clone(),
+                    ),
+                    archived_loader,
+                ))
+            }
+            2 => {
+                if table_version < 8 {
+                    return Err(CoreError::Unsupported(format!(
+                        "Unsupported table version {} with timeline layout 
version {}",
+                        table_version, layout_version
+                    )));
+                }
+                let archived_loader = if archived_enabled {
+                    Some(TimelineLoader::new_layout_two_archived(
+                        self.hudi_configs.clone(),
+                        self.storage.clone(),
+                    ))
+                } else {
+                    None
+                };
+                Ok((
+                    TimelineLoader::new_layout_two_active(
+                        self.hudi_configs.clone(),
+                        self.storage.clone(),
+                    ),
+                    archived_loader,
+                ))
+            }
+            _ => Err(CoreError::Unsupported(format!(
+                "Unsupported timeline layout version: {}",
+                layout_version
+            ))),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::config::table::HudiTableConfig;
+    use std::collections::HashMap;
+
+    fn create_test_configs(options: HashMap<String, String>) -> 
Arc<HudiConfigs> {
+        let mut opts = options;
+        opts.entry(HudiTableConfig::BasePath.as_ref().to_string())
+            .or_insert("/tmp/test".to_string());
+        Arc::new(HudiConfigs::new(opts))
+    }
+
+    fn create_test_storage(configs: Arc<HudiConfigs>) -> Arc<Storage> {
+        Storage::new(Arc::new(HashMap::new()), configs).unwrap()
+    }
+
+    #[tokio::test]
+    async fn test_build_without_table_version() {
+        let configs = create_test_configs(HashMap::new());
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let timeline = builder.build().await.unwrap();
+        // Should default to layout one active
+        assert!(!timeline.active_loader.is_layout_two_active());
+        assert!(timeline.archived_loader.is_none());
+    }
+
+    #[tokio::test]
+    async fn test_build_with_table_version_6() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "6".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let timeline = builder.build().await.unwrap();
+        // v6 should use layout one
+        assert!(!timeline.active_loader.is_layout_two_active());
+        assert!(timeline.archived_loader.is_none());
+    }
+
+    #[tokio::test]
+    async fn test_build_with_table_version_8_defaults_to_layout_2() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "8".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let timeline = builder.build().await.unwrap();
+        // v8 without explicit layout version defaults to layout 2
+        assert!(timeline.active_loader.is_layout_two_active());
+        assert!(timeline.archived_loader.is_none());
+    }
+
+    #[tokio::test]
+    async fn test_build_with_explicit_layout_version_1() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "8".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+            "1".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let timeline = builder.build().await.unwrap();
+        // Explicit layout version 1
+        assert!(!timeline.active_loader.is_layout_two_active());
+    }
+
+    #[tokio::test]
+    async fn test_build_with_archived_enabled() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "8".to_string(),
+        );
+        options.insert(
+            TimelineArchivedReadEnabled.as_ref().to_string(),
+            "true".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let timeline = builder.build().await.unwrap();
+        assert!(timeline.active_loader.is_layout_two_active());
+        assert!(timeline.archived_loader.is_some());
+        assert!(timeline
+            .archived_loader
+            .as_ref()
+            .unwrap()
+            .is_layout_two_archived());
+    }
+
+    #[tokio::test]
+    async fn test_build_layout_1_with_archived() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "7".to_string(),
+        );
+        options.insert(
+            TimelineArchivedReadEnabled.as_ref().to_string(),
+            "true".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let timeline = builder.build().await.unwrap();
+        assert!(!timeline.active_loader.is_layout_two_active());
+        assert!(timeline.archived_loader.is_some());
+        assert!(!timeline
+            .archived_loader
+            .as_ref()
+            .unwrap()
+            .is_layout_two_archived());
+    }
+
+    #[tokio::test]
+    async fn test_unsupported_table_version_with_layout_1() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "5".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+            "1".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let result = builder.build().await;
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Unsupported table version 5"));
+    }
+
+    #[tokio::test]
+    async fn test_unsupported_table_version_with_layout_2() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "7".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+            "2".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let result = builder.build().await;
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Unsupported table version 7"));
+    }
+
+    #[tokio::test]
+    async fn test_unsupported_layout_version() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::TableVersion.as_ref().to_string(),
+            "8".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::TimelineLayoutVersion.as_ref().to_string(),
+            "3".to_string(),
+        );
+        let configs = create_test_configs(options);
+        let storage = create_test_storage(configs.clone());
+        let builder = TimelineBuilder::new(configs, storage);
+
+        let result = builder.build().await;
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Unsupported timeline layout version: 3"));
+    }
+}
diff --git a/crates/core/src/timeline/instant.rs 
b/crates/core/src/timeline/instant.rs
index e3e191e..1cd29af 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -98,10 +98,19 @@ impl AsRef<str> for State {
 }
 
 /// An [Instant] represents a point in time when an action was performed on 
the table.
+///
+/// For table version 8+, completed instants have a different filename format:
+/// `{requestedTimestamp}_{completedTimestamp}.{action}` instead of 
`{timestamp}.{action}`.
+/// The `timestamp` field stores the requested timestamp, and 
`completed_timestamp` stores
+/// the completion timestamp for v8+ completed instants.
 #[allow(dead_code)]
 #[derive(Clone, Debug, Eq, PartialEq)]
 pub struct Instant {
+    /// The timestamp when the action was requested (used for ordering and 
identification).
+    /// TODO rename to requested_timestamp for clarity in v8+?
     pub timestamp: String,
+    /// The timestamp when the action completed (only present for v8+ 
completed instants).
+    pub completed_timestamp: Option<String>,
     pub action: Action,
     pub state: State,
     pub epoch_millis: i64,
@@ -132,19 +141,47 @@ impl FromStr for Instant {
 
 impl Instant {
     pub fn try_from_file_name_and_timezone(file_name: &str, timezone: &str) -> 
Result<Self> {
-        let (timestamp, action_suffix) = file_name
+        let (timestamp_part, action_suffix) = file_name
             .split_once('.')
             .ok_or_else(|| CoreError::Timeline(format!("Invalid file name: 
{}", file_name)))?;
-        Self::validate_timestamp(timestamp)?;
-        let dt = Self::parse_datetime(timestamp, timezone)?;
+
         let (action, state) = Self::parse_action_and_state(action_suffix)?;
 
-        Ok(Self {
-            timestamp: timestamp.to_string(),
-            state,
-            action,
-            epoch_millis: dt.timestamp_millis(),
-        })
+        // Check for v8+ completed instant format: 
{requestedTimestamp}_{completedTimestamp}.{action}
+        // This format is only used for completed instants (state == Completed)
+        if let Some((requested_ts, completed_ts)) = 
timestamp_part.split_once('_') {
+            // This is a v8+ completed instant with both requested and 
completed timestamps
+            Self::validate_timestamp(requested_ts)?;
+            Self::validate_timestamp(completed_ts)?;
+            let dt = Self::parse_datetime(requested_ts, timezone)?;
+
+            if state != State::Completed {
+                return Err(CoreError::Timeline(format!(
+                    "Underscore timestamp format is only valid for completed 
instants: {}",
+                    file_name
+                )));
+            }
+
+            Ok(Self {
+                timestamp: requested_ts.to_string(),
+                completed_timestamp: Some(completed_ts.to_string()),
+                state,
+                action,
+                epoch_millis: dt.timestamp_millis(),
+            })
+        } else {
+            // pre v8 format: {timestamp}.{action}[.{state}]
+            Self::validate_timestamp(timestamp_part)?;
+            let dt = Self::parse_datetime(timestamp_part, timezone)?;
+
+            Ok(Self {
+                timestamp: timestamp_part.to_string(),
+                completed_timestamp: None,
+                state,
+                action,
+                epoch_millis: dt.timestamp_millis(),
+            })
+        }
     }
 
     fn validate_timestamp(timestamp: &str) -> Result<()> {
@@ -203,7 +240,19 @@ impl Instant {
 
     pub fn file_name(&self) -> String {
         match (&self.action, &self.state) {
-            (_, State::Completed) => format!("{}.{}", self.timestamp, 
self.action.as_ref()),
+            (_, State::Completed) => {
+                // For v8+ completed instants with completed_timestamp, use 
the underscore format
+                if let Some(completed_ts) = &self.completed_timestamp {
+                    format!(
+                        "{}_{}.{}",
+                        self.timestamp,
+                        completed_ts,
+                        self.action.as_ref()
+                    )
+                } else {
+                    format!("{}.{}", self.timestamp, self.action.as_ref())
+                }
+            }
             (Action::Commit, State::Inflight) => {
                 format!("{}.{}", self.timestamp, self.state.as_ref())
             }
@@ -216,8 +265,16 @@ impl Instant {
         }
     }
 
+    /// Get the relative path with the default `.hoodie/` base directory.
+    /// For v8+ tables with Layout Version 2, use `relative_path_with_base` 
instead.
     pub fn relative_path(&self) -> Result<String> {
-        let mut commit_file_path = PathBuf::from(HUDI_METADATA_DIR);
+        self.relative_path_with_base(HUDI_METADATA_DIR)
+    }
+
+    /// Get the relative path with a specified base directory.
+    /// Use `.hoodie/` for pre-v8 tables (Layout Version 1) or 
`.hoodie/timeline/` for v8+ tables (Layout Version 2).
+    pub fn relative_path_with_base(&self, base_dir: &str) -> Result<String> {
+        let mut commit_file_path = PathBuf::from(base_dir);
         commit_file_path.push(self.file_name());
         commit_file_path
             .to_str()
@@ -379,4 +436,106 @@ mod tests {
             None => std::env::remove_var("TZ"),
         }
     }
+
+    #[test]
+    fn test_parse_action_and_state() -> Result<()> {
+        // Test action with state suffix
+        let (action, state) = 
Instant::parse_action_and_state("commit.inflight")?;
+        assert_eq!(action, Action::Commit);
+        assert_eq!(state, State::Inflight);
+
+        let (action, state) = 
Instant::parse_action_and_state("deltacommit.requested")?;
+        assert_eq!(action, Action::DeltaCommit);
+        assert_eq!(state, State::Requested);
+
+        // Test standalone "inflight" special case
+        let (action, state) = Instant::parse_action_and_state("inflight")?;
+        assert_eq!(action, Action::Commit);
+        assert_eq!(state, State::Inflight);
+
+        // Test completed state (no suffix)
+        let (action, state) = Instant::parse_action_and_state("commit")?;
+        assert_eq!(action, Action::Commit);
+        assert_eq!(state, State::Completed);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_relative_path_with_base() -> Result<()> {
+        let instant = Instant::from_str("20240101120000.commit")?;
+
+        let path = instant.relative_path_with_base(".hoodie")?;
+        assert_eq!(path, ".hoodie/20240101120000.commit");
+
+        let path = instant.relative_path_with_base(".hoodie/timeline")?;
+        assert_eq!(path, ".hoodie/timeline/20240101120000.commit");
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_is_replacecommit() -> Result<()> {
+        let replace_instant = 
Instant::from_str("20240101120000.replacecommit")?;
+        assert!(replace_instant.is_replacecommit());
+
+        let commit_instant = Instant::from_str("20240101120000.commit")?;
+        assert!(!commit_instant.is_replacecommit());
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_v8_instant_with_completed_timestamp() -> Result<()> {
+        // v8+ format: {requestedTimestamp}_{completedTimestamp}.{action}
+        let instant = 
Instant::from_str("20240101120000000_20240101120005000.commit")?;
+        assert_eq!(instant.timestamp, "20240101120000000");
+        assert_eq!(
+            instant.completed_timestamp,
+            Some("20240101120005000".to_string())
+        );
+        assert_eq!(instant.action, Action::Commit);
+        assert_eq!(instant.state, State::Completed);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_v8_instant_underscore_format_requires_completed_state() {
+        // Underscore format with non-completed state should fail
+        let result = 
Instant::from_str("20240101120000000_20240101120005000.commit.inflight");
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("only valid for completed"));
+    }
+
+    #[test]
+    fn test_file_name_with_completed_timestamp() -> Result<()> {
+        let instant = 
Instant::from_str("20240101120000000_20240101120005000.commit")?;
+        let file_name = instant.file_name();
+        assert_eq!(file_name, "20240101120000000_20240101120005000.commit");
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_parse_datetime_with_milliseconds() -> Result<()> {
+        let dt = Instant::parse_datetime("20240315142530500", "UTC")?;
+        assert_eq!(dt.timestamp_millis() % 1000, 500);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_validate_timestamp_errors() {
+        // Too short
+        let result = Instant::from_str("2024.commit");
+        assert!(result.is_err());
+
+        // Too long (not 14 or 17)
+        let result = Instant::from_str("202403151425301.commit");
+        assert!(result.is_err());
+    }
 }
diff --git a/crates/core/src/timeline/loader.rs 
b/crates/core/src/timeline/loader.rs
new file mode 100644
index 0000000..52db158
--- /dev/null
+++ b/crates/core/src/timeline/loader.rs
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::table::HudiTableConfig::{ArchiveLogFolder, 
TimelineHistoryPath, TimelinePath};
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::metadata::commit::HoodieCommitMetadata;
+use crate::metadata::HUDI_METADATA_DIR;
+use crate::storage::Storage;
+use crate::timeline::instant::Instant;
+use crate::timeline::selector::TimelineSelector;
+use crate::Result;
+use log::debug;
+use serde_json::{Map, Value};
+use std::sync::Arc;
+
+#[derive(Debug, Clone)]
+pub struct TimelineLoader {
+    hudi_configs: Arc<HudiConfigs>,
+    storage: Arc<Storage>,
+    layout: TimelineLayout,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum TimelineLayout {
+    V1Active,
+    V1Archived,
+    V2Active,
+    V2Archived,
+}
+
+#[allow(dead_code)]
+impl TimelineLoader {
+    /// Create a new Layout One Active loader
+    pub fn new_layout_one_active(hudi_configs: Arc<HudiConfigs>, storage: 
Arc<Storage>) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+            layout: TimelineLayout::V1Active,
+        }
+    }
+
+    /// Create a new Layout One Archived loader
+    pub fn new_layout_one_archived(hudi_configs: Arc<HudiConfigs>, storage: 
Arc<Storage>) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+            layout: TimelineLayout::V1Archived,
+        }
+    }
+
+    /// Create a new Layout Two Active loader
+    pub fn new_layout_two_active(hudi_configs: Arc<HudiConfigs>, storage: 
Arc<Storage>) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+            layout: TimelineLayout::V2Active,
+        }
+    }
+
+    /// Create a new Layout Two Archived loader
+    pub fn new_layout_two_archived(hudi_configs: Arc<HudiConfigs>, storage: 
Arc<Storage>) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+            layout: TimelineLayout::V2Archived,
+        }
+    }
+
+    /// Returns the storage for this loader.
+    fn storage(&self) -> &Arc<Storage> {
+        &self.storage
+    }
+
+    /// Check if this is a Layout Two Active loader (for testing/assertions)
+    #[cfg(test)]
+    pub(crate) fn is_layout_two_active(&self) -> bool {
+        matches!(self.layout, TimelineLayout::V2Active)
+    }
+
+    /// Check if this is a Layout Two Archived loader (for testing/assertions)
+    #[cfg(test)]
+    pub(crate) fn is_layout_two_archived(&self) -> bool {
+        matches!(self.layout, TimelineLayout::V2Archived)
+    }
+
+    /// Returns the directory for active timeline instants.
+    ///
+    /// - Layout One (v6-v8): `.hoodie/`
+    /// - Layout Two (v8+): `.hoodie/{timeline_path}` (configurable via 
`hoodie.timeline.path`, default: `timeline`)
+    fn get_active_timeline_dir(&self) -> String {
+        match self.layout {
+            TimelineLayout::V1Active | TimelineLayout::V1Archived => 
HUDI_METADATA_DIR.to_string(),
+            TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+                let timeline_path: String = 
self.hudi_configs.get_or_default(TimelinePath).into();
+                format!("{}/{}", HUDI_METADATA_DIR, timeline_path)
+            }
+        }
+    }
+
+    /// Returns the directory for archived timeline instants.
+    ///
+    /// - Layout One (v6-v8): configurable via `hoodie.archivelog.folder` 
(default: `.hoodie/archived`)
+    /// - Layout Two (v8+): `.hoodie/{timeline_path}/{history_path}` (LSM 
history)
+    fn get_archived_timeline_dir(&self) -> String {
+        match self.layout {
+            TimelineLayout::V1Active | TimelineLayout::V1Archived => {
+                // Layout 1 uses hoodie.archivelog.folder for archived timeline
+                self.hudi_configs.get_or_default(ArchiveLogFolder).into()
+            }
+            TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+                // Layout 2 uses LSM history directory
+                let timeline_path: String = 
self.hudi_configs.get_or_default(TimelinePath).into();
+                let history_path: String =
+                    
self.hudi_configs.get_or_default(TimelineHistoryPath).into();
+                format!("{}/{}/{}", HUDI_METADATA_DIR, timeline_path, 
history_path)
+            }
+        }
+    }
+
+    /// Returns the appropriate timeline directory based on loader type 
(active vs archived).
+    ///
+    /// This is a convenience method that delegates to either 
`get_active_timeline_dir`
+    /// or `get_archived_timeline_dir` depending on the layout.
+    pub fn get_timeline_dir(&self) -> String {
+        match self.layout {
+            TimelineLayout::V1Active | TimelineLayout::V2Active => 
self.get_active_timeline_dir(),
+            TimelineLayout::V1Archived | TimelineLayout::V2Archived => {
+                self.get_archived_timeline_dir()
+            }
+        }
+    }
+
+    pub async fn load_instants(
+        &self,
+        selector: &TimelineSelector,
+        desc: bool,
+    ) -> Result<Vec<Instant>> {
+        match self.layout {
+            TimelineLayout::V1Active => {
+                let files = 
self.storage.list_files(Some(HUDI_METADATA_DIR)).await?;
+                let mut instants = Vec::with_capacity(files.len() / 3);
+
+                for file_info in files {
+                    match selector.try_create_instant(file_info.name.as_str()) 
{
+                        Ok(instant) => instants.push(instant),
+                        Err(e) => {
+                            debug!(
+                                "Instant not created from file {:?} due to: 
{:?}",
+                                file_info, e
+                            );
+                        }
+                    }
+                }
+
+                instants.sort_unstable();
+                instants.shrink_to_fit();
+
+                if desc {
+                    Ok(instants.into_iter().rev().collect())
+                } else {
+                    Ok(instants)
+                }
+            }
+            TimelineLayout::V2Active => {
+                let timeline_dir = self.get_timeline_dir();
+                let files = 
self.storage.list_files(Some(&timeline_dir)).await?;
+                let mut instants = Vec::new();
+
+                for file_info in files {
+                    // TODO: make `storage.list_files` api support such 
filtering, like ignore crc and return files only
+                    if file_info.name.starts_with("history/") || 
file_info.name.ends_with(".crc") {
+                        continue;
+                    }
+                    match selector.try_create_instant(file_info.name.as_str()) 
{
+                        Ok(instant) => instants.push(instant),
+                        Err(e) => {
+                            debug!(
+                                "Instant not created from file {:?} due to: 
{:?}",
+                                file_info, e
+                            );
+                        }
+                    }
+                }
+
+                instants.sort_unstable();
+                instants.shrink_to_fit();
+
+                if desc {
+                    Ok(instants.into_iter().rev().collect())
+                } else {
+                    Ok(instants)
+                }
+            }
+            _ => Err(CoreError::Unsupported(
+                "Loading from this timeline layout is not implemented 
yet.".to_string(),
+            )),
+        }
+    }
+
+    /// Load archived timeline instants based on selector criteria.
+    ///
+    /// # Behavior
+    ///
+    /// - Returns empty Vec if this is an active loader (not an archived 
loader)
+    /// - Attempts to load archived instants, propagating any errors
+    ///
+    /// Note: This method assumes the archived loader was created only when
+    /// `TimelineArchivedReadEnabled` is true. The config check is done in the 
builder.
+    ///
+    /// # Arguments
+    ///
+    /// * `selector` - The criteria for selecting instants (actions, states, 
time range)
+    /// * `desc` - If true, return instants in descending order by timestamp
+    pub(crate) async fn load_archived_instants(
+        &self,
+        selector: &TimelineSelector,
+        desc: bool,
+    ) -> Result<Vec<Instant>> {
+        // Early return for active loaders - they don't have archived parts
+        match self.layout {
+            TimelineLayout::V1Active | TimelineLayout::V2Active => return 
Ok(Vec::new()),
+            _ => {}
+        }
+
+        match self.layout {
+            TimelineLayout::V1Archived => {
+                // Resolve archive folder from configs or fallback
+                let archive_dir: String = 
self.hudi_configs.get_or_default(ArchiveLogFolder).into();
+
+                // List files and try creating instants through selector
+                let files = self.storage.list_files(Some(&archive_dir)).await?;
+                let mut instants = Vec::new();
+                for file_info in files {
+                    if let Ok(instant) = 
selector.try_create_instant(file_info.name.as_str()) {
+                        instants.push(instant);
+                    }
+                }
+                instants.sort_unstable();
+                if desc {
+                    instants.reverse();
+                }
+                Ok(instants)
+            }
+            TimelineLayout::V2Archived => {
+                // TODO: Implement v2 LSM history reader. For now, return 
empty.
+                let _ = (selector, desc);
+                Ok(Vec::new())
+            }
+            _ => Ok(Vec::new()),
+        }
+    }
+
+    /// Load instant metadata from storage and parse based on the layout 
version.
+    ///
+    /// Layout Version 1 (v6-v8): JSON format
+    /// Layout Version 2 (v8+): Avro format
+    ///
+    /// Returns the metadata as a JSON Map for uniform processing.
+    pub(crate) async fn load_instant_metadata(
+        &self,
+        instant: &Instant,
+    ) -> Result<Map<String, Value>> {
+        let timeline_dir = self.get_timeline_dir();
+        let path = instant.relative_path_with_base(&timeline_dir)?;
+        let bytes = self.storage.get_file_data(path.as_str()).await?;
+
+        match self.layout {
+            TimelineLayout::V1Active | TimelineLayout::V1Archived => {
+                // Layout 1: JSON format
+                serde_json::from_slice(&bytes).map_err(|e| {
+                    CoreError::Timeline(format!("Failed to parse JSON commit 
metadata: {}", e))
+                })
+            }
+            TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+                // Layout 2: Avro format
+                let metadata = HoodieCommitMetadata::from_avro_bytes(&bytes)?;
+                metadata.to_json_map()
+            }
+        }
+    }
+
+    /// Load instant metadata and return as a JSON string.
+    ///
+    /// Layout Version 1 (v6-v8): Return raw JSON bytes
+    /// Layout Version 2 (v8+): Parse Avro and serialize to JSON
+    pub(crate) async fn load_instant_metadata_as_json(&self, instant: 
&Instant) -> Result<String> {
+        let timeline_dir = self.get_timeline_dir();
+        let path = instant.relative_path_with_base(&timeline_dir)?;
+        let bytes = self.storage.get_file_data(path.as_str()).await?;
+
+        match self.layout {
+            TimelineLayout::V1Active | TimelineLayout::V1Archived => {
+                // Layout 1: JSON format - return raw bytes as string
+                String::from_utf8(bytes.to_vec()).map_err(|e| {
+                    CoreError::Timeline(format!("Failed to convert JSON bytes 
to string: {}", e))
+                })
+            }
+            TimelineLayout::V2Active | TimelineLayout::V2Archived => {
+                // Layout 2: Avro format - deserialize then serialize to JSON
+                let metadata = HoodieCommitMetadata::from_avro_bytes(&bytes)?;
+                serde_json::to_string(&metadata).map_err(|e| {
+                    CoreError::Timeline(format!("Failed to serialize metadata 
to JSON: {}", e))
+                })
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::config::table::HudiTableConfig;
+    use crate::config::HudiConfigs;
+    use std::collections::HashMap;
+
+    fn create_test_configs() -> Arc<HudiConfigs> {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            "/tmp/test".to_string(),
+        );
+        Arc::new(HudiConfigs::new(options))
+    }
+
+    fn create_test_storage(configs: Arc<HudiConfigs>) -> Arc<Storage> {
+        Storage::new(Arc::new(HashMap::new()), configs).unwrap()
+    }
+
+    #[test]
+    fn test_layout_one_active_directory() {
+        let configs = create_test_configs();
+        let storage = create_test_storage(configs.clone());
+        let loader = TimelineLoader::new_layout_one_active(configs, storage);
+
+        assert_eq!(loader.get_active_timeline_dir(), HUDI_METADATA_DIR);
+        assert_eq!(loader.get_timeline_dir(), HUDI_METADATA_DIR);
+    }
+
+    #[test]
+    fn test_layout_one_archived_directory() {
+        let configs = create_test_configs();
+        let storage = create_test_storage(configs.clone());
+        let loader = TimelineLoader::new_layout_one_archived(configs, storage);
+
+        // Default archived folder
+        let expected = ".hoodie/archived";
+        assert_eq!(loader.get_archived_timeline_dir(), expected);
+        assert_eq!(loader.get_timeline_dir(), expected);
+    }
+
+    #[test]
+    fn test_layout_two_active_directory() {
+        let configs = create_test_configs();
+        let storage = create_test_storage(configs.clone());
+        let loader = TimelineLoader::new_layout_two_active(configs, storage);
+
+        // Default timeline path
+        let expected = format!("{}/timeline", HUDI_METADATA_DIR);
+        assert_eq!(loader.get_active_timeline_dir(), expected);
+        assert_eq!(loader.get_timeline_dir(), expected);
+    }
+
+    #[test]
+    fn test_layout_two_archived_directory() {
+        let configs = create_test_configs();
+        let storage = create_test_storage(configs.clone());
+        let loader = TimelineLoader::new_layout_two_archived(configs, storage);
+
+        // Default timeline path and history path
+        let expected = format!("{}/timeline/history", HUDI_METADATA_DIR);
+        assert_eq!(loader.get_archived_timeline_dir(), expected);
+        assert_eq!(loader.get_timeline_dir(), expected);
+    }
+
+    #[test]
+    fn test_custom_archive_folder() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            "/tmp/test".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::ArchiveLogFolder.as_ref().to_string(),
+            ".hoodie/custom_archive".to_string(),
+        );
+        let configs = Arc::new(HudiConfigs::new(options));
+        let storage = create_test_storage(configs.clone());
+        let loader = TimelineLoader::new_layout_one_archived(configs, storage);
+
+        assert_eq!(loader.get_archived_timeline_dir(), 
".hoodie/custom_archive");
+    }
+
+    #[test]
+    fn test_custom_timeline_paths() {
+        let mut options = HashMap::new();
+        options.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            "/tmp/test".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::TimelinePath.as_ref().to_string(),
+            "custom_timeline".to_string(),
+        );
+        options.insert(
+            HudiTableConfig::TimelineHistoryPath.as_ref().to_string(),
+            "custom_history".to_string(),
+        );
+        let configs = Arc::new(HudiConfigs::new(options));
+        let storage = create_test_storage(configs.clone());
+
+        let loader = TimelineLoader::new_layout_two_active(configs.clone(), 
storage.clone());
+        assert_eq!(
+            loader.get_active_timeline_dir(),
+            format!("{}/custom_timeline", HUDI_METADATA_DIR)
+        );
+
+        let archived_loader = TimelineLoader::new_layout_two_archived(configs, 
storage);
+        assert_eq!(
+            archived_loader.get_archived_timeline_dir(),
+            format!("{}/custom_timeline/custom_history", HUDI_METADATA_DIR)
+        );
+    }
+
+    #[test]
+    fn test_layout_type_checks() {
+        let configs = create_test_configs();
+        let storage = create_test_storage(configs.clone());
+
+        let v2_active = TimelineLoader::new_layout_two_active(configs.clone(), 
storage.clone());
+        assert!(v2_active.is_layout_two_active());
+        assert!(!v2_active.is_layout_two_archived());
+
+        let v2_archived = TimelineLoader::new_layout_two_archived(configs, 
storage);
+        assert!(!v2_archived.is_layout_two_active());
+        assert!(v2_archived.is_layout_two_archived());
+    }
+
+    #[test]
+    fn test_storage_access() {
+        let configs = create_test_configs();
+        let storage = create_test_storage(configs.clone());
+        let storage_ptr = Arc::as_ptr(&storage);
+
+        let loader = TimelineLoader::new_layout_one_active(configs, storage);
+
+        // Verify storage is accessible and is the same instance
+        assert_eq!(Arc::as_ptr(loader.storage()), storage_ptr);
+    }
+}
diff --git a/crates/core/src/timeline/lsm_tree.rs 
b/crates/core/src/timeline/lsm_tree.rs
new file mode 100644
index 0000000..276dc51
--- /dev/null
+++ b/crates/core/src/timeline/lsm_tree.rs
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::config::table::HudiTableConfig::{TimelineHistoryPath, TimelinePath};
+use crate::error::CoreError;
+use crate::metadata::HUDI_METADATA_DIR;
+use crate::storage::Storage;
+use crate::Result;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+
+#[cfg(not(tarpaulin_include))]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct TimelineManifest {
+    pub version: i64,
+    pub entries: Vec<ManifestEntry>,
+}
+
+/// Entry in an LSM timeline manifest.
+/// Each entry describes a compacted timeline file covering a time range
+/// in the LSM history directory.
+/// - `file_name`: relative path of the compacted timeline file under history
+/// - `min_instant`/`max_instant`: smallest/largest instant timestamps covered
+/// - `level`: LSM level where this file resides
+/// - `file_size`: size in bytes of the file
+#[cfg(not(tarpaulin_include))]
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ManifestEntry {
+    pub file_name: String,
+    pub min_instant: String,
+    pub max_instant: String,
+    pub level: i32,
+    pub file_size: i64,
+}
+
+/// LSM tree for v8+ timeline history management.
+/// The paths are resolved from configs on-the-fly via `hoodie.timeline.path` 
and `hoodie.timeline.history.path`.
+#[cfg(not(tarpaulin_include))]
+pub struct LSMTree {
+    storage: Arc<Storage>,
+}
+
+#[cfg(not(tarpaulin_include))]
+impl LSMTree {
+    pub fn new(storage: Arc<Storage>) -> Self {
+        Self { storage }
+    }
+
+    /// Returns the timeline directory path, resolved from configs.
+    pub fn timeline_dir(&self) -> String {
+        let timeline_path: String = self
+            .storage
+            .hudi_configs
+            .get_or_default(TimelinePath)
+            .into();
+        format!("{}/{}", HUDI_METADATA_DIR, timeline_path)
+    }
+
+    /// Returns the history directory path, resolved from configs.
+    pub fn history_dir(&self) -> String {
+        let timeline_path: String = self
+            .storage
+            .hudi_configs
+            .get_or_default(TimelinePath)
+            .into();
+        let history_path: String = self
+            .storage
+            .hudi_configs
+            .get_or_default(TimelineHistoryPath)
+            .into();
+        format!("{}/{}/{}", HUDI_METADATA_DIR, timeline_path, history_path)
+    }
+
+    pub async fn read_manifest(&self) -> Result<Option<TimelineManifest>> {
+        let history_dir = self.history_dir();
+        let version_path = format!("{}/_version_", history_dir);
+        if let Ok(data) = self.storage.get_file_data(&version_path).await {
+            let version_str =
+                String::from_utf8(data.to_vec()).map_err(|e| 
CoreError::Timeline(e.to_string()))?;
+            let version = version_str
+                .trim()
+                .parse::<i64>()
+                .map_err(|e| CoreError::Timeline(e.to_string()))?;
+            let manifest_path = format!("{}/manifest_{}", history_dir, 
version);
+            let data = self.storage.get_file_data(&manifest_path).await?;
+            let manifest: TimelineManifest =
+                serde_json::from_slice(&data).map_err(|e| 
CoreError::Timeline(e.to_string()))?;
+            Ok(Some(manifest))
+        } else {
+            Ok(None)
+        }
+    }
+}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 094c07d..f916f11 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -16,7 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+pub mod builder;
 pub mod instant;
+pub mod loader;
+pub mod lsm_tree;
 pub(crate) mod selector;
 pub(crate) mod util;
 
@@ -24,17 +27,18 @@ use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::file_group::builder::{build_file_groups, 
build_replaced_file_groups, FileGroupMerger};
 use crate::file_group::FileGroup;
-use crate::metadata::HUDI_METADATA_DIR;
 use crate::schema::resolver::{
     resolve_avro_schema_from_commit_metadata, 
resolve_schema_from_commit_metadata,
 };
 use crate::storage::Storage;
+use crate::timeline::builder::TimelineBuilder;
 use crate::timeline::instant::Action;
+use crate::timeline::loader::TimelineLoader;
 use crate::timeline::selector::TimelineSelector;
 use crate::Result;
 use arrow_schema::Schema;
 use instant::Instant;
-use log::debug;
+
 use serde_json::{Map, Value};
 use std::collections::{HashMap, HashSet};
 use std::fmt::Debug;
@@ -46,6 +50,8 @@ use std::sync::Arc;
 pub struct Timeline {
     hudi_configs: Arc<HudiConfigs>,
     pub(crate) storage: Arc<Storage>,
+    active_loader: TimelineLoader,
+    archived_loader: Option<TimelineLoader>,
     pub completed_commits: Vec<Instant>,
 }
 
@@ -54,18 +60,19 @@ pub const DEFAULT_LOADING_ACTIONS: &[Action] =
     &[Action::Commit, Action::DeltaCommit, Action::ReplaceCommit];
 
 impl Timeline {
-    #[cfg(test)]
-    pub(crate) async fn new_from_completed_commits(
+    pub(crate) fn new(
         hudi_configs: Arc<HudiConfigs>,
-        storage_options: Arc<HashMap<String, String>>,
-        completed_commits: Vec<Instant>,
-    ) -> Result<Self> {
-        let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
-        Ok(Self {
+        storage: Arc<Storage>,
+        active_loader: TimelineLoader,
+        archived_loader: Option<TimelineLoader>,
+    ) -> Self {
+        Self {
             hudi_configs,
             storage,
-            completed_commits,
-        })
+            active_loader,
+            archived_loader,
+            completed_commits: Vec::new(),
+        }
     }
 
     pub(crate) async fn new_from_storage(
@@ -73,58 +80,66 @@ impl Timeline {
         storage_options: Arc<HashMap<String, String>>,
     ) -> Result<Self> {
         let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
+        let mut timeline = TimelineBuilder::new(hudi_configs, 
storage).build().await?;
         let selector = TimelineSelector::completed_actions_in_range(
             DEFAULT_LOADING_ACTIONS,
-            hudi_configs.clone(),
+            timeline.hudi_configs.clone(),
             None,
             None,
         )?;
-        let completed_commits = Self::load_instants(&selector, &storage, 
false).await?;
-        Ok(Self {
-            hudi_configs,
-            storage,
-            completed_commits,
-        })
+        timeline.completed_commits = timeline.load_instants(&selector, 
false).await?;
+        Ok(timeline)
     }
 
-    async fn load_instants(
+    /// Load instants from the timeline based on the selector criteria.
+    ///
+    /// # Archived Timeline Loading
+    ///
+    /// Archived instants are loaded only when BOTH conditions are met:
+    /// 1. The selector has a time filter (start or end timestamp)
+    /// 2. `TimelineArchivedReadEnabled` config is set to `true`
+    ///
+    /// This double-gate design ensures:
+    /// - Queries without time filters only read active timeline (optimization)
+    /// - Historical time-range queries can include archived data when 
explicitly enabled
+    ///
+    /// # Arguments
+    ///
+    /// * `selector` - The criteria for selecting instants (actions, states, 
time range)
+    /// * `desc` - If true, return instants in descending order by timestamp
+    pub async fn load_instants(
+        &self,
         selector: &TimelineSelector,
-        storage: &Storage,
         desc: bool,
     ) -> Result<Vec<Instant>> {
-        let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?;
-
-        // For most cases, we load completed instants, so we can pre-allocate 
the vector with a
-        // capacity of 1/3 of the total number of listed files,
-        // ignoring requested and inflight instants.
-        let mut instants = Vec::with_capacity(files.len() / 3);
-
-        for file_info in files {
-            match selector.try_create_instant(file_info.name.as_str()) {
-                Ok(instant) => instants.push(instant),
-                Err(e) => {
-                    // Ignore files that are not valid or desired instants.
-                    debug!(
-                        "Instant not created from file {:?} due to: {:?}",
-                        file_info, e
-                    );
+        // If a time filter is present and we have an archived loader, include 
archived as well.
+        if selector.has_time_filter() {
+            let mut instants = self.active_loader.load_instants(selector, 
desc).await?;
+            if let Some(archived_loader) = &self.archived_loader {
+                let mut archived = archived_loader
+                    .load_archived_instants(selector, desc)
+                    .await?;
+                if !archived.is_empty() {
+                    // Both sides already sorted by loaders; append is fine 
for now.
+                    instants.append(&mut archived);
                 }
             }
-        }
-
-        instants.sort_unstable();
-
-        // As of current impl., we don't mutate instants once timeline is 
created,
-        // so we can save some memory by shrinking the capacity.
-        instants.shrink_to_fit();
-
-        if desc {
-            Ok(instants.into_iter().rev().collect())
-        } else {
             Ok(instants)
+        } else {
+            self.active_loader.load_instants(selector, desc).await
         }
     }
 
+    async fn load_instants_internal(
+        &self,
+        selector: &TimelineSelector,
+        desc: bool,
+    ) -> Result<Vec<Instant>> {
+        // For now, just load active. Archived support will be added 
internally later
+        // based on selector ranges.
+        self.active_loader.load_instants(selector, desc).await
+    }
+
     /// Get the completed commit [Instant]s in the timeline.
     ///
     /// * For Copy-on-write tables, this includes commit instants.
@@ -136,7 +151,7 @@ impl Timeline {
     pub async fn get_completed_commits(&self, desc: bool) -> 
Result<Vec<Instant>> {
         let selector =
             
TimelineSelector::completed_commits_in_range(self.hudi_configs.clone(), None, 
None)?;
-        Self::load_instants(&selector, &self.storage, desc).await
+        self.load_instants_internal(&selector, desc).await
     }
 
     /// Get the completed deltacommit [Instant]s in the timeline.
@@ -152,7 +167,7 @@ impl Timeline {
             None,
             None,
         )?;
-        Self::load_instants(&selector, &self.storage, desc).await
+        self.load_instants_internal(&selector, desc).await
     }
 
     /// Get the completed replacecommit [Instant]s in the timeline.
@@ -166,7 +181,7 @@ impl Timeline {
             None,
             None,
         )?;
-        Self::load_instants(&selector, &self.storage, desc).await
+        self.load_instants_internal(&selector, desc).await
     }
 
     /// Get the completed clustering commit [Instant]s in the timeline.
@@ -180,7 +195,7 @@ impl Timeline {
             None,
             None,
         )?;
-        let instants = Self::load_instants(&selector, &self.storage, 
desc).await?;
+        let instants = self.load_instants_internal(&selector, desc).await?;
         let mut clustering_instants = Vec::new();
         for instant in instants {
             let metadata = self.get_instant_metadata(&instant).await?;
@@ -198,19 +213,14 @@ impl Timeline {
     }
 
     async fn get_instant_metadata(&self, instant: &Instant) -> 
Result<Map<String, Value>> {
-        let path = instant.relative_path()?;
-        let bytes = self.storage.get_file_data(path.as_str()).await?;
-
-        serde_json::from_slice(&bytes)
-            .map_err(|e| CoreError::Timeline(format!("Failed to get commit 
metadata: {}", e)))
+        self.active_loader.load_instant_metadata(instant).await
     }
 
     /// Get the instant metadata in JSON format.
     pub async fn get_instant_metadata_in_json(&self, instant: &Instant) -> 
Result<String> {
-        let path = instant.relative_path()?;
-        let bytes = self.storage.get_file_data(path.as_str()).await?;
-        String::from_utf8(bytes.to_vec())
-            .map_err(|e| CoreError::Timeline(format!("Failed to get commit 
metadata: {}", e)))
+        self.active_loader
+            .load_instant_metadata_as_json(instant)
+            .await
     }
 
     pub(crate) async fn get_latest_commit_metadata(&self) -> 
Result<Map<String, Value>> {
@@ -322,14 +332,102 @@ mod tests {
 
     use crate::config::table::HudiTableConfig;
     use crate::metadata::meta_field::MetaField;
+    use crate::timeline::instant::{Action, State};
+    #[tokio::test]
+    async fn test_timeline_v8_nonpartitioned() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+        assert_eq!(timeline.completed_commits.len(), 2);
+        assert!(timeline.active_loader.is_layout_two_active());
+        // Archived loader should be None when TimelineArchivedReadEnabled is 
false (default)
+        assert!(timeline.archived_loader.is_none());
+    }
+
+    #[tokio::test]
+    async fn test_timeline_v8_with_archived_enabled() {
+        use 
crate::config::internal::HudiInternalConfig::TimelineArchivedReadEnabled;
+
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+
+        // Build initial configs with base path and archived read enabled
+        let mut options_map = HashMap::new();
+        options_map.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            base_url.to_string(),
+        );
+        options_map.insert(
+            TimelineArchivedReadEnabled.as_ref().to_string(),
+            "true".to_string(),
+        );
+
+        let storage = Storage::new(
+            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::new(options_map.clone())),
+        )
+        .unwrap();
+
+        let table_properties = crate::config::util::parse_data_for_options(
+            &storage
+                .get_file_data(".hoodie/hoodie.properties")
+                .await
+                .unwrap(),
+            "=",
+        )
+        .unwrap();
+        options_map.extend(table_properties);
+        let hudi_configs = Arc::new(HudiConfigs::new(options_map));
+
+        let timeline = TimelineBuilder::new(hudi_configs, storage)
+            .build()
+            .await
+            .unwrap();
+
+        // When TimelineArchivedReadEnabled is true, archived loader should be 
created
+        assert!(timeline.active_loader.is_layout_two_active());
+        assert!(timeline
+            .archived_loader
+            .as_ref()
+            .map(|l| l.is_layout_two_archived())
+            .unwrap_or(false));
+    }
 
     async fn create_test_timeline(base_url: Url) -> Timeline {
-        Timeline::new_from_storage(
-            Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, 
base_url)])),
+        let storage = Storage::new(
             Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::new([(
+                HudiTableConfig::BasePath,
+                base_url.to_string(),
+            )])),
         )
-        .await
-        .unwrap()
+        .unwrap();
+
+        let hudi_configs = HudiConfigs::new([(HudiTableConfig::BasePath, 
base_url.to_string())]);
+        let table_properties = crate::config::util::parse_data_for_options(
+            &storage
+                .get_file_data(".hoodie/hoodie.properties")
+                .await
+                .unwrap(),
+            "=",
+        )
+        .unwrap();
+        let mut hudi_configs_map = hudi_configs.as_options();
+        hudi_configs_map.extend(table_properties);
+        let hudi_configs = Arc::new(HudiConfigs::new(hudi_configs_map));
+
+        let mut timeline = TimelineBuilder::new(hudi_configs, storage)
+            .build()
+            .await
+            .unwrap();
+
+        let selector = TimelineSelector::completed_actions_in_range(
+            DEFAULT_LOADING_ACTIONS,
+            timeline.hudi_configs.clone(),
+            None,
+            None,
+        )
+        .unwrap();
+        timeline.completed_commits = timeline.load_instants(&selector, 
false).await.unwrap();
+        timeline
     }
 
     #[tokio::test]
@@ -385,7 +483,12 @@ mod tests {
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, CoreError::Timeline(_)));
-        assert!(err.to_string().contains("Failed to get commit metadata"));
+        // Error message changed to be more specific about JSON parsing
+        assert!(
+            err.to_string()
+                .contains("Failed to parse JSON commit metadata")
+                || err.to_string().contains("EOF while parsing")
+        );
 
         let instant = Instant::from_str("20240402144910683.commit").unwrap();
 
@@ -394,7 +497,12 @@ mod tests {
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(matches!(err, CoreError::Timeline(_)));
-        assert!(err.to_string().contains("Failed to get commit metadata"));
+        // Error message changed to be more specific about JSON parsing
+        assert!(
+            err.to_string()
+                .contains("Failed to parse JSON commit metadata")
+                || err.to_string().contains("expected value")
+        );
     }
 
     #[tokio::test]
@@ -498,4 +606,97 @@ mod tests {
             );
         }
     }
+
+    #[tokio::test]
+    async fn test_get_completed_commits() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let commits = timeline.get_completed_commits(false).await.unwrap();
+        assert!(!commits.is_empty());
+        // All should be commits in completed state
+        for instant in &commits {
+            assert_eq!(instant.action, Action::Commit);
+            assert_eq!(instant.state, State::Completed);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_completed_deltacommits() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let deltacommits = 
timeline.get_completed_deltacommits(false).await.unwrap();
+        // All should be deltacommits (or empty if none exist)
+        for instant in &deltacommits {
+            assert_eq!(instant.action, Action::DeltaCommit);
+            assert_eq!(instant.state, State::Completed);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_completed_replacecommits() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let replacecommits = 
timeline.get_completed_replacecommits(false).await.unwrap();
+        // All should be replacecommits (or empty if none exist)
+        for instant in &replacecommits {
+            assert!(instant.action.is_replacecommit());
+            assert_eq!(instant.state, State::Completed);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_commits_descending_order() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let commits_asc = timeline.get_completed_commits(false).await.unwrap();
+        let commits_desc = timeline.get_completed_commits(true).await.unwrap();
+
+        assert_eq!(commits_asc.len(), commits_desc.len());
+        if !commits_asc.is_empty() {
+            // Verify descending order is reverse of ascending
+            assert_eq!(commits_asc.first(), commits_desc.last());
+            assert_eq!(commits_asc.last(), commits_desc.first());
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_instant_metadata_in_json() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let commits = timeline.get_completed_commits(false).await.unwrap();
+        if let Some(instant) = commits.first() {
+            let json = timeline
+                .get_instant_metadata_in_json(instant)
+                .await
+                .unwrap();
+            // Should be valid JSON
+            assert!(serde_json::from_str::<serde_json::Value>(&json).is_ok());
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_latest_commit_timestamp() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let timestamp = timeline.get_latest_commit_timestamp().unwrap();
+        assert!(!timestamp.is_empty());
+        // Should be in timeline timestamp format
+        assert!(timestamp.len() >= 14);
+    }
+
+    #[tokio::test]
+    async fn test_get_latest_commit_timestamp_as_option() {
+        let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
+        let timeline = create_test_timeline(base_url).await;
+
+        let timestamp = timeline.get_latest_commit_timestamp_as_option();
+        assert!(timestamp.is_some());
+        assert!(!timestamp.unwrap().is_empty());
+    }
 }
diff --git a/crates/core/src/timeline/selector.rs 
b/crates/core/src/timeline/selector.rs
index 8dca297..cb98378 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -142,6 +142,10 @@ pub struct TimelineSelector {
     states: Vec<State>,
     actions: Vec<Action>,
     include_archived: bool,
+    /// Timeline layout version determines instant format validation:
+    /// - Layout 1 (pre-v8): expects `{timestamp}.{action}` for completed 
instants
+    /// - Layout 2 (v8+): expects 
`{requestedTimestamp}_{completedTimestamp}.{action}` for completed instants
+    timeline_layout_version: isize,
 }
 
 #[allow(dead_code)]
@@ -152,6 +156,25 @@ impl TimelineSelector {
             .into()
     }
 
+    fn get_timeline_layout_version_from_configs(hudi_configs: &HudiConfigs) -> 
isize {
+        // Try to get layout version from config, otherwise infer from table 
version
+        if let Some(layout_version) = 
hudi_configs.try_get(HudiTableConfig::TimelineLayoutVersion) {
+            layout_version.into()
+        } else {
+            // Apply same default logic as TimelineBuilder:
+            // v8+ tables default to layout 2, earlier versions default to 
layout 1
+            let table_version: isize = hudi_configs
+                .try_get(HudiTableConfig::TableVersion)
+                .map(|v| v.into())
+                .unwrap_or(6); // Conservative default if table version is 
somehow missing
+            if table_version >= 8 {
+                2
+            } else {
+                1
+            }
+        }
+    }
+
     fn parse_datetime(timezone: &str, timestamp: Option<&str>) -> 
Result<Option<DateTime<Utc>>> {
         timestamp
             .map(|e| Instant::parse_datetime(e, timezone))
@@ -165,6 +188,7 @@ impl TimelineSelector {
         end: Option<&str>,
     ) -> Result<Self> {
         let timezone = Self::get_timezone_from_configs(&hudi_configs);
+        let timeline_layout_version = 
Self::get_timeline_layout_version_from_configs(&hudi_configs);
         let start_datetime = Self::parse_datetime(&timezone, start)?;
         let end_datetime = Self::parse_datetime(&timezone, end)?;
         Ok(Self {
@@ -174,6 +198,7 @@ impl TimelineSelector {
             states: vec![State::Completed],
             actions: actions.to_vec(),
             include_archived: false,
+            timeline_layout_version,
         })
     }
 
@@ -201,6 +226,11 @@ impl TimelineSelector {
         Self::completed_actions_in_range(&[Action::ReplaceCommit], 
hudi_configs, start, end)
     }
 
+    /// Whether the selector has any time filter (start or end) applied.
+    pub fn has_time_filter(&self) -> bool {
+        self.start_datetime.is_some() || self.end_datetime.is_some()
+    }
+
     pub fn should_include_action(&self, action: &Action) -> bool {
         self.actions.is_empty() || self.actions.contains(action)
     }
@@ -210,7 +240,7 @@ impl TimelineSelector {
     }
 
     pub fn try_create_instant(&self, file_name: &str) -> Result<Instant> {
-        let (timestamp, action_suffix) = 
file_name.split_once('.').ok_or_else(|| {
+        let (timestamp_part, action_suffix) = 
file_name.split_once('.').ok_or_else(|| {
             CoreError::Timeline(format!(
                 "Instant not created due to invalid file name: {file_name}"
             ))
@@ -230,6 +260,40 @@ impl TimelineSelector {
             )));
         }
 
+        // Handle v8+ completed instant format: 
{requestedTimestamp}_{completedTimestamp}.{action}
+        // Validate format based on timeline layout version and instant state
+        let (timestamp, completed_timestamp) = if let Some((requested_ts, 
completed_ts)) =
+            timestamp_part.split_once('_')
+        {
+            // Found underscore format - this should be a v8+ (layout 2) 
completed instant
+            if self.timeline_layout_version == 1 && state == State::Completed {
+                return Err(CoreError::Timeline(format!(
+                    "Unexpected v8+ instant format in timeline layout v1: 
{file_name}"
+                )));
+            }
+
+            // Validate both timestamps
+            if requested_ts.len() != 17 && requested_ts.len() != 14 {
+                return Err(CoreError::Timeline(format!(
+                    "Invalid requested timestamp in v8+ format: {file_name}"
+                )));
+            }
+            if completed_ts.len() != 17 && completed_ts.len() != 14 {
+                return Err(CoreError::Timeline(format!(
+                    "Invalid completed timestamp in v8+ format: {file_name}"
+                )));
+            }
+            (requested_ts, Some(completed_ts.to_string()))
+        } else {
+            // No underscore format - this should be a pre-v8 instant OR a 
non-completed v8+ instant
+            if self.timeline_layout_version == 2 && state == State::Completed {
+                return Err(CoreError::Timeline(format!(
+                    "Expected v8+ instant format (with completed timestamp) in 
timeline layout v2 for completed instant: {file_name}"
+                )));
+            }
+            (timestamp_part, None)
+        };
+
         let dt = Instant::parse_datetime(timestamp, &self.timezone)?;
         if let Some(start) = self.start_datetime {
             if dt < start {
@@ -251,6 +315,7 @@ impl TimelineSelector {
 
         Ok(Instant {
             timestamp: timestamp.to_string(),
+            completed_timestamp,
             epoch_millis: dt.timestamp_millis(),
             action,
             state,
@@ -306,6 +371,11 @@ mod tests {
     use super::*;
     use crate::config::table::HudiTableConfig;
     use crate::config::HudiConfigs;
+    use crate::storage::Storage;
+    use crate::timeline::builder::TimelineBuilder;
+    use crate::timeline::instant::{Action, Instant, State};
+    use crate::timeline::Timeline;
+    use chrono::{DateTime, Utc};
     use std::collections::HashMap;
     use std::str::FromStr;
     use std::sync::Arc;
@@ -470,6 +540,7 @@ mod tests {
             states: states.to_vec(),
             actions: actions.to_vec(),
             include_archived: false,
+            timeline_layout_version: 1, // Default to layout v1 for tests
         }
     }
 
@@ -523,7 +594,25 @@ mod tests {
     }
 
     async fn create_test_timeline() -> Timeline {
-        let instants = vec![
+        let storage = Storage::new(
+            Arc::new(HashMap::new()),
+            Arc::new(HudiConfigs::new([
+                (HudiTableConfig::BasePath, "file:///tmp/base"),
+                (HudiTableConfig::TableVersion, "6"),
+            ])),
+        )
+        .unwrap();
+        let mut timeline = TimelineBuilder::new(
+            Arc::new(HudiConfigs::new([
+                (HudiTableConfig::BasePath, "file:///tmp/base"),
+                (HudiTableConfig::TableVersion, "6"),
+            ])),
+            storage,
+        )
+        .build()
+        .await
+        .unwrap();
+        timeline.completed_commits = vec![
             Instant::from_str("20240103153000.commit").unwrap(),
             Instant::from_str("20240103153010999.commit").unwrap(),
             Instant::from_str("20240103153020999.commit.requested").unwrap(),
@@ -531,16 +620,7 @@ mod tests {
             Instant::from_str("20240103153020999.commit").unwrap(),
             Instant::from_str("20240103153030999.commit").unwrap(),
         ];
-        Timeline::new_from_completed_commits(
-            Arc::new(HudiConfigs::new([(
-                HudiTableConfig::BasePath,
-                "file:///tmp/base",
-            )])),
-            Arc::new(HashMap::new()),
-            instants,
-        )
-        .await
-        .unwrap()
+        timeline
     }
 
     #[tokio::test]
@@ -555,6 +635,7 @@ mod tests {
             end_datetime: None,
             timezone: "UTC".to_string(),
             include_archived: false,
+            timeline_layout_version: 1,
         };
         assert!(selector.select(&timeline).unwrap().is_empty());
     }
@@ -570,9 +651,76 @@ mod tests {
             end_datetime: end.map(|s| Instant::parse_datetime(s, 
"UTC").unwrap()),
             timezone: "UTC".to_string(),
             include_archived: false,
+            timeline_layout_version: 1,
         }
     }
 
+    #[test]
+    fn test_layout_version_validation() {
+        // Test layout v1 - should reject v8+ format for completed instants
+        let selector_v1 = TimelineSelector {
+            timezone: "UTC".to_string(),
+            start_datetime: None,
+            end_datetime: None,
+            states: vec![State::Completed],
+            actions: vec![Action::DeltaCommit],
+            include_archived: false,
+            timeline_layout_version: 1,
+        };
+
+        // v8+ format should be rejected for layout v1
+        let result = 
selector_v1.try_create_instant("20240103153000_20240103153001.deltacommit");
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Unexpected v8+ instant format in timeline layout v1"));
+
+        // pre-v8 format should work for layout v1
+        assert!(selector_v1
+            .try_create_instant("20240103153000.deltacommit")
+            .is_ok());
+
+        // Test layout v2 - should reject pre-v8 format for completed instants
+        let selector_v2 = TimelineSelector {
+            timezone: "UTC".to_string(),
+            start_datetime: None,
+            end_datetime: None,
+            states: vec![State::Completed],
+            actions: vec![Action::DeltaCommit],
+            include_archived: false,
+            timeline_layout_version: 2,
+        };
+
+        // pre-v8 format should be rejected for layout v2 completed instants
+        let result = 
selector_v2.try_create_instant("20240103153000.deltacommit");
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Expected v8+ instant format"));
+
+        // v8+ format should work for layout v2
+        assert!(selector_v2
+            .try_create_instant("20240103153000_20240103153001.deltacommit")
+            .is_ok());
+
+        // Non-completed instants (inflight, requested) should work with 
standard format in both layouts
+        let selector_v2_inflight = TimelineSelector {
+            timezone: "UTC".to_string(),
+            start_datetime: None,
+            end_datetime: None,
+            states: vec![State::Inflight],
+            actions: vec![Action::DeltaCommit],
+            include_archived: false,
+            timeline_layout_version: 2,
+        };
+
+        assert!(selector_v2_inflight
+            .try_create_instant("20240103153000.deltacommit.inflight")
+            .is_ok());
+    }
+
     #[tokio::test]
     async fn test_timestamp_filtering() -> Result<()> {
         let timeline = create_test_timeline().await;
diff --git a/crates/core/tests/data/commit_metadata/v6_commit.json 
b/crates/core/tests/data/commit_metadata/v6_commit.json
new file mode 100644
index 0000000..511619d
--- /dev/null
+++ b/crates/core/tests/data/commit_metadata/v6_commit.json
@@ -0,0 +1,38 @@
+{
+  "partitionToWriteStats" : {
+    "" : [ {
+      "fileId" : "a079bdb3-731c-4894-b855-abfcd6921007-0",
+      "path" : 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+      "cdcStats" : null,
+      "prevCommit" : "20240418173550988",
+      "numWrites" : 4,
+      "numDeletes" : 0,
+      "numUpdateWrites" : 1,
+      "numInserts" : 1,
+      "totalWriteBytes" : 441520,
+      "totalWriteErrors" : 0,
+      "tempPath" : null,
+      "partitionPath" : "",
+      "totalLogRecords" : 0,
+      "totalLogFilesCompacted" : 0,
+      "totalLogSizeCompacted" : 0,
+      "totalUpdatedRecordsCompacted" : 0,
+      "totalLogBlocks" : 0,
+      "totalCorruptLogBlock" : 0,
+      "totalRollbackBlocks" : 0,
+      "fileSizeInBytes" : 441520,
+      "minEventTime" : null,
+      "maxEventTime" : null,
+      "runtimeStats" : {
+        "totalScanTime" : 0,
+        "totalUpsertTime" : 105,
+        "totalCreateTime" : 0
+      }
+    } ]
+  },
+  "compacted" : false,
+  "extraMetadata" : {
+    "schema" : 
"{\"type\":\"record\",\"name\":\"v6_nonpartitioned_record\",\"namespace\":\"hoodie.v6_nonpartitioned\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"isActive\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"byteField\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"shortField\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"intFi
 [...]
+  },
+  "operationType" : "UPSERT"
+}
\ No newline at end of file
diff --git a/crates/core/tests/data/commit_metadata/v8_deltacommit.avro 
b/crates/core/tests/data/commit_metadata/v8_deltacommit.avro
new file mode 100644
index 0000000..d8000be
Binary files /dev/null and 
b/crates/core/tests/data/commit_metadata/v8_deltacommit.avro differ
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties
new file mode 100644
index 0000000..dd1798c
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_cow/.hoodie/hoodie.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.name=commits_load_schema_from_base_file_cow
+hoodie.table.type=COPY_ON_WRITE
+hoodie.table.version=6
+hoodie.table.timeline.layout.version=1
diff --git 
a/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties
 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties
new file mode 100644
index 0000000..51853f9
--- /dev/null
+++ 
b/crates/core/tests/data/timeline/commits_load_schema_from_base_file_mor/.hoodie/hoodie.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.table.name=commits_load_schema_from_base_file_mor
+hoodie.table.type=MERGE_ON_READ
+hoodie.table.version=6
+hoodie.table.timeline.layout.version=1
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 9ee6094..662548b 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -54,26 +54,29 @@ use hudi_core::table::Table as HudiTable;
 ///
 /// # Examples
 ///
-/// ```rust
+/// ```rust,no_run
 /// use std::sync::Arc;
 ///
 /// use datafusion::error::Result;
 /// use datafusion::prelude::{DataFrame, SessionContext};
 /// use hudi_datafusion::HudiDataSource;
 ///
-/// // Initialize a new DataFusion session context
-/// let ctx = SessionContext::new();
+/// #[tokio::main]
+/// async fn main() -> Result<()> {
+///     // Initialize a new DataFusion session context
+///     let ctx = SessionContext::new();
 ///
-/// // Create a new HudiDataSource with specific read options
-/// let hudi = HudiDataSource::new_with_options(
-///     "/tmp/trips_table",
-///     [("hoodie.read.input.partitions", 5)]).await?;
-///
-/// // Register the Hudi table with the session context
-/// ctx.register_table("trips_table", Arc::new(hudi))?;
-/// let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 
'san_francisco'").await?;
-/// df.show().await?;
+///     // Create a new HudiDataSource with specific read options
+///     let hudi = HudiDataSource::new_with_options(
+///         "/tmp/trips_table",
+///         [("hoodie.read.input.partitions", "5")]).await?;
 ///
+///     // Register the Hudi table with the session context
+///     ctx.register_table("trips_table", Arc::new(hudi))?;
+///     let df: DataFrame = ctx.sql("SELECT * from trips_table where city = 
'san_francisco'").await?;
+///     df.show().await?;
+///     Ok(())
+/// }
 /// ```
 #[derive(Clone, Debug)]
 pub struct HudiDataSource {
@@ -254,30 +257,27 @@ impl TableProvider for HudiDataSource {
 ///
 /// Creating a new `HudiTableFactory` instance:
 ///
-/// ```rust
+/// ```rust,no_run
 /// use datafusion::prelude::SessionContext;
+/// use datafusion::catalog::TableProviderFactory;
+/// use datafusion::sql::parser::CreateExternalTable;
 /// use hudi_datafusion::HudiTableFactory;
 ///
-/// // Initialize a new HudiTableFactory
-/// let factory = HudiTableFactory::new();
+/// #[tokio::main]
+/// async fn main() -> datafusion::error::Result<()> {
+///     // Initialize a new HudiTableFactory
+///     let factory = HudiTableFactory::new();
 ///     
-/// // The factory can be used to create Hudi tables
-/// let table = factory.create_table(...)?;
-///
-///
-/// // Using `CREATE EXTERNAL TABLE` to register Hudi table:
-/// let test_table =  factory.create_table(...)?; // Table with path + url
-/// let ctx = SessionContext::new();
-///
-/// // Register table in session using `CREATE EXTERNAL TABLE` command
-/// let create_table_sql = format!(
-///     "CREATE EXTERNAL TABLE {} STORED AS HUDI LOCATION '{}' {}",
-///     test_table.as_ref(),
-///     test_table.path(),
-///     concat_as_sql_options(options)
-/// );
-/// ctx.sql(create_table_sql.as_str()).await?; // Query against this table
-///
+///     // Initialize a new DataFusion session context
+///     let ctx = SessionContext::new();
+///     
+///     // Register table using SQL command
+///     let create_table_sql =
+///         "CREATE EXTERNAL TABLE trips_table STORED AS HUDI LOCATION 
'/tmp/trips_table'";
+///     ctx.sql(create_table_sql).await?;
+///     
+///     Ok(())
+/// }
 /// ```
 #[derive(Debug)]
 pub struct HudiTableFactory {}

Reply via email to