This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 199a25d  feat: add APIs for time-travel read (#33)
199a25d is described below

commit 199a25d82ba09c0bedeb12430bf22299603209b2
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue Jul 2 15:48:05 2024 -0500

    feat: add APIs for time-travel read (#33)
---
 crates/core/src/file_group/mod.rs | 27 ++++++++---
 crates/core/src/storage/mod.rs    | 12 ++---
 crates/core/src/table/fs_view.rs  | 38 +++++++++------
 crates/core/src/table/mod.rs      | 98 +++++++++++++++++++++++++++++++++++----
 crates/core/src/table/timeline.rs | 70 ++++++++++++++++++++--------
 crates/datafusion/src/lib.rs      | 14 ++----
 crates/tests/src/lib.rs           |  7 +++
 python/hudi/_internal.pyi         | 12 +++--
 python/hudi/table.py              | 22 +++++----
 python/src/lib.rs                 | 31 ++++++++-----
 python/tests/test_table_read.py   | 44 ++++++++++++++----
 11 files changed, 279 insertions(+), 96 deletions(-)

diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 6b9b22c..c0af0b3 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -100,7 +100,7 @@ impl FileSlice {
         if self.base_file.stats.is_none() {
             let parquet_meta = storage
                 .get_parquet_file_metadata(&self.base_file_relative_path())
-                .await;
+                .await?;
             let num_records = parquet_meta.file_metadata().num_rows();
             let stats = FileStats { num_records };
             self.base_file.stats = Some(stats);
@@ -163,12 +163,22 @@ impl FileGroup {
         }
     }
 
-    pub fn get_latest_file_slice(&self) -> Option<&FileSlice> {
-        return self.file_slices.values().next_back();
+    pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> {
+        let as_of = timestamp.to_string();
+        return if let Some((_, file_slice)) = 
self.file_slices.range(..=as_of).next_back() {
+            Some(file_slice)
+        } else {
+            None
+        };
     }
 
-    pub fn get_latest_file_slice_mut(&mut self) -> Option<&mut FileSlice> {
-        return self.file_slices.values_mut().next_back();
+    pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut 
FileSlice> {
+        let as_of = timestamp.to_string();
+        return if let Some((_, file_slice)) = 
self.file_slices.range_mut(..=as_of).next_back() {
+            Some(file_slice)
+        } else {
+            None
+        };
     }
 }
 
@@ -203,8 +213,11 @@ mod tests {
         let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| 
k.as_str()).collect();
         assert_eq!(commit_times, vec!["20240402123035233", 
"20240402144910683"]);
         assert_eq!(
-            fg.get_latest_file_slice().unwrap().base_file.commit_time,
-            "20240402144910683"
+            fg.get_file_slice_as_of("20240402123035233")
+                .unwrap()
+                .base_file
+                .commit_time,
+            "20240402123035233"
         )
     }
 
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 76e085f..0f09c05 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -72,14 +72,14 @@ impl Storage {
         }
     }
 
-    pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> 
ParquetMetaData {
-        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
-        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+    pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> 
Result<ParquetMetaData> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
         let obj_store = self.object_store.clone();
-        let meta = obj_store.head(&obj_path).await.unwrap();
+        let meta = obj_store.head(&obj_path).await?;
         let reader = ParquetObjectReader::new(obj_store, meta);
-        let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
-        builder.metadata().as_ref().to_owned()
+        let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+        Ok(builder.metadata().as_ref().to_owned())
     }
 
     pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 8f278dd..b7cd77c 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -44,7 +44,7 @@ impl FileSystemView {
         props: Arc<HashMap<String, String>>,
     ) -> Result<Self> {
         let storage = Storage::new(base_url, storage_options)?;
-        let partition_paths = Self::get_partition_paths(&storage).await?;
+        let partition_paths = Self::load_partition_paths(&storage).await?;
         let partition_to_file_groups =
             Self::load_file_groups_for_partitions(&storage, 
partition_paths).await?;
         let partition_to_file_groups = 
Arc::new(DashMap::from_iter(partition_to_file_groups));
@@ -55,7 +55,7 @@ impl FileSystemView {
         })
     }
 
-    async fn get_partition_paths(storage: &Storage) -> Result<Vec<String>> {
+    async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
         let top_level_dirs: Vec<String> = storage
             .list_dirs(None)
             .await
@@ -120,24 +120,25 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> {
+    pub fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
         let mut file_slices = Vec::new();
         for fgs in self.partition_to_file_groups.iter() {
             let fgs_ref = fgs.value();
             for fg in fgs_ref {
-                if let Some(fsl) = fg.get_latest_file_slice() {
-                    file_slices.push(fsl.clone())
+                if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
+                    // TODO: pass ref instead of copying
+                    file_slices.push(fsl.clone());
                 }
             }
         }
         Ok(file_slices)
     }
 
-    pub async fn load_latest_file_slices_stats(&self) -> Result<()> {
+    pub async fn load_file_slices_stats_as_of(&self, timestamp: &str) -> 
Result<()> {
         for mut fgs in self.partition_to_file_groups.iter_mut() {
             let fgs_ref = fgs.value_mut();
             for fg in fgs_ref {
-                if let Some(file_slice) = fg.get_latest_file_slice_mut() {
+                if let Some(file_slice) = 
fg.get_file_slice_mut_as_of(timestamp) {
                     file_slice
                         .load_stats(&self.storage)
                         .await
@@ -148,11 +149,17 @@ impl FileSystemView {
         Ok(())
     }
 
-    pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<Vec<RecordBatch>> {
+    pub async fn read_file_slice_by_path_unchecked(
+        &self,
+        relative_path: &str,
+    ) -> Result<Vec<RecordBatch>> {
         Ok(self.storage.get_parquet_file_data(relative_path).await)
     }
-    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<Vec<RecordBatch>> {
-        self.read_file_slice_by_path(&file_slice.base_file_relative_path())
+    pub async fn read_file_slice_unchecked(
+        &self,
+        file_slice: &FileSlice,
+    ) -> Result<Vec<RecordBatch>> {
+        
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
             .await
     }
 }
@@ -171,7 +178,9 @@ mod tests {
     async fn get_partition_paths_for_nonpartitioned_table() {
         let base_url = TestTable::V6Nonpartitioned.url();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let partition_paths = 
FileSystemView::get_partition_paths(&storage).await.unwrap();
+        let partition_paths = FileSystemView::load_partition_paths(&storage)
+            .await
+            .unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
         assert_eq!(partition_path_set, HashSet::from([""]))
@@ -181,7 +190,9 @@ mod tests {
     async fn get_partition_paths_for_complexkeygen_table() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let partition_paths = 
FileSystemView::get_partition_paths(&storage).await.unwrap();
+        let partition_paths = FileSystemView::load_partition_paths(&storage)
+            .await
+            .unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
         assert_eq!(
@@ -204,7 +215,8 @@ mod tests {
         )
         .await
         .unwrap();
-        let file_slices = fs_view.get_latest_file_slices().unwrap();
+
+        let file_slices = 
fs_view.get_file_slices_as_of("20240418173551906").unwrap();
         assert_eq!(file_slices.len(), 1);
         let fg_ids = file_slices
             .iter()
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 825b2d4..52daf12 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -115,11 +115,42 @@ impl Table {
     }
 
     pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
+        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+            self.get_file_slices_as_of(timestamp).await
+        } else {
+            Ok(Vec::new())
+        }
+    }
+
+    pub async fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
         self.file_system_view
-            .load_latest_file_slices_stats()
+            .load_file_slices_stats_as_of(timestamp)
             .await
-            .expect("Successful loading of file slice stats.");
-        self.file_system_view.get_latest_file_slices()
+            .context("Fail to load file slice stats.")?;
+        self.file_system_view.get_file_slices_as_of(timestamp)
+    }
+
+    pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
+        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+            self.read_snapshot_as_of(timestamp).await
+        } else {
+            Ok(Vec::new())
+        }
+    }
+
+    pub async fn read_snapshot_as_of(&self, timestamp: &str) -> 
Result<Vec<RecordBatch>> {
+        let file_slices = self
+            .get_file_slices_as_of(timestamp)
+            .await
+            .context(format!("Failed to get file slices as of {}", 
timestamp))?;
+        let mut batches = Vec::new();
+        for f in file_slices {
+            match self.file_system_view.read_file_slice_unchecked(&f).await {
+                Ok(batch) => batches.extend(batch),
+                Err(e) => return Err(anyhow!("Failed to read file slice {:?} - 
{}", f, e)),
+            }
+        }
+        Ok(batches)
     }
 
     #[cfg(test)]
@@ -133,13 +164,9 @@ impl Table {
 
     pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<Vec<RecordBatch>> {
         self.file_system_view
-            .read_file_slice_by_path(relative_path)
+            .read_file_slice_by_path_unchecked(relative_path)
             .await
     }
-
-    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<Vec<RecordBatch>> {
-        self.file_system_view.read_file_slice(file_slice).await
-    }
 }
 
 impl ProvidesTableMetadata for Table {
@@ -240,7 +267,7 @@ mod tests {
     use crate::table::Table;
 
     #[tokio::test]
-    async fn hudi_table_get_latest_schema() {
+    async fn hudi_table_get_schema() {
         let base_url = TestTable::V6Nonpartitioned.url();
         let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
         let fields: Vec<String> = hudi_table
@@ -308,7 +335,7 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn hudi_table_get_latest_file_paths() {
+    async fn hudi_table_get_file_paths() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
         assert_eq!(hudi_table.timeline.instants.len(), 2);
@@ -324,6 +351,57 @@ mod tests {
         assert_eq!(actual, expected);
     }
 
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_as_of_timestamps() {
+        let base_url = TestTable::V6Nonpartitioned.url();
+        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
+
+        let file_slices = hudi_table.get_file_slices().await.unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+        );
+
+        // as of the latest timestamp
+        let file_slices = hudi_table
+            .get_file_slices_as_of("20240418173551906")
+            .await
+            .unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
+        );
+
+        // as of just smaller than the latest timestamp
+        let file_slices = hudi_table
+            .get_file_slices_as_of("20240418173551905")
+            .await
+            .unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
+        );
+
+        // as of non-exist old timestamp
+        let file_slices = hudi_table.get_file_slices_as_of("0").await.unwrap();
+        assert_eq!(
+            file_slices
+                .iter()
+                .map(|f| f.base_file_relative_path())
+                .collect::<Vec<_>>(),
+            Vec::<String>::new()
+        );
+    }
+
     #[tokio::test]
     async fn hudi_table_get_table_metadata() {
         let base_path =
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index 9dcf6e2..70fc6ee 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -23,7 +23,7 @@ use std::fmt::Debug;
 use std::path::PathBuf;
 use std::sync::Arc;
 
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
 use arrow_schema::Schema;
 use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
@@ -112,6 +112,13 @@ impl Timeline {
         Ok(completed_commits)
     }
 
+    pub fn get_latest_commit_timestamp(&self) -> Option<&str> {
+        self.instants
+            .iter()
+            .next_back()
+            .map(|instant| instant.timestamp.as_str())
+    }
+
     async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> {
         match self.instants.iter().next_back() {
             Some(instant) => {
@@ -120,8 +127,8 @@ impl Timeline {
                 let relative_path = commit_file_path.to_str().ok_or(anyhow!(
                     "Failed to get commit file path for instant: {:?}",
                     instant
-                ));
-                let bytes = self.storage.get_file_data(relative_path?).await;
+                ))?;
+                let bytes = self.storage.get_file_data(relative_path).await;
                 let json: Value = serde_json::from_slice(&bytes)?;
                 let commit_metadata = json
                     .as_object()
@@ -135,22 +142,29 @@ impl Timeline {
 
     pub async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
-        if let Some(partition_to_write_stats) = 
commit_metadata["partitionToWriteStats"].as_object()
-        {
-            if let Some((_, value)) = partition_to_write_stats.iter().next() {
-                if let Some(first_value) = value.as_array().and_then(|arr| 
arr.first()) {
-                    if let Some(path) = first_value["path"].as_str() {
-                        let parquet_meta = 
self.storage.get_parquet_file_metadata(path).await;
-                        let arrow_schema = parquet_to_arrow_schema(
-                            parquet_meta.file_metadata().schema_descr(),
-                            None,
-                        )?;
-                        return Ok(arrow_schema);
-                    }
-                }
-            }
+
+        let parquet_path = commit_metadata
+            .get("partitionToWriteStats")
+            .and_then(|v| v.as_object())
+            .and_then(|obj| obj.values().next())
+            .and_then(|value| value.as_array())
+            .and_then(|arr| arr.first())
+            .and_then(|first_value| first_value["path"].as_str());
+
+        if let Some(path) = parquet_path {
+            let parquet_meta = self
+                .storage
+                .get_parquet_file_metadata(path)
+                .await
+                .context("Failed to get parquet file metadata")?;
+
+            
parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None)
+                .context("Failed to resolve the latest schema")
+        } else {
+            Err(anyhow!(
+                "Failed to resolve the latest schema: no file path found"
+            ))
         }
-        Err(anyhow!("Failed to resolve schema."))
     }
 }
 
@@ -168,7 +182,7 @@ mod tests {
     use crate::table::timeline::{Instant, State, Timeline};
 
     #[tokio::test]
-    async fn read_latest_schema() {
+    async fn timeline_read_latest_schema() {
         let base_url = TestTable::V6Nonpartitioned.url();
         let timeline = Timeline::new(
             Arc::new(base_url),
@@ -181,6 +195,24 @@ mod tests {
         assert_eq!(table_schema.fields.len(), 21)
     }
 
+    #[tokio::test]
+    async fn timeline_read_latest_schema_from_empty_table() {
+        let base_url = TestTable::V6Empty.url();
+        let timeline = Timeline::new(
+            Arc::new(base_url),
+            Arc::new(HashMap::new()),
+            Arc::new(HashMap::new()),
+        )
+        .await
+        .unwrap();
+        let table_schema = timeline.get_latest_schema().await;
+        assert!(table_schema.is_err());
+        assert_eq!(
+            table_schema.err().unwrap().to_string(),
+            "Failed to resolve the latest schema: no file path found"
+        )
+    }
+
     #[tokio::test]
     async fn init_commits_timeline() {
         let base_url =
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index f4a1bba..f677247 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -65,17 +65,11 @@ impl HudiDataSource {
     }
 
     async fn get_record_batches(&mut self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
-        let file_slices = self.table.get_file_slices().await.map_err(|e| {
-            DataFusionError::Execution(format!("Failed to load file slices 
from table: {}", e))
-        })?;
-
-        let mut record_batches = Vec::new();
-        for fsl in file_slices {
-            let batches = self.table.read_file_slice(&fsl).await.map_err(|e| {
-                DataFusionError::Execution(format!("Failed to read records 
from table: {}", e))
+        let record_batches =
+            self.table.read_snapshot().await.map_err(|e| {
+                DataFusionError::Execution(format!("Failed to read snapshot: 
{}", e))
             })?;
-            record_batches.extend(batches)
-        }
+
         Ok(record_batches)
     }
 }
diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs
index e467818..f94d5fd 100644
--- a/crates/tests/src/lib.rs
+++ b/crates/tests/src/lib.rs
@@ -35,6 +35,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf {
 
 pub enum TestTable {
     V6ComplexkeygenHivestyle,
+    V6Empty,
     V6Nonpartitioned,
 }
 
@@ -46,6 +47,7 @@ impl TestTable {
             Self::V6ComplexkeygenHivestyle => data_path
                 .join("v6_complexkeygen_hivestyle.zip")
                 .into_boxed_path(),
+            Self::V6Empty => data_path.join("v6_empty.zip").into_boxed_path(),
             Self::V6Nonpartitioned => 
data_path.join("v6_nonpartitioned.zip").into_boxed_path(),
         }
     }
@@ -58,6 +60,11 @@ impl TestTable {
                 .to_str()
                 .unwrap()
                 .to_string(),
+            Self::V6Empty => extract_test_table(&zip_path)
+                .join("v6_empty")
+                .to_str()
+                .unwrap()
+                .to_string(),
             Self::V6Nonpartitioned => extract_test_table(&zip_path)
                 .join("v6_nonpartitioned")
                 .to_str()
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 83ed929..67dd0cc 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -33,8 +33,10 @@ class HudiFileSlice:
     base_file_size: int
     num_records: int
 
+    @property
     def base_file_relative_path(self) -> str: ...
 
+
 class BindingHudiTable:
 
     def __init__(
@@ -43,8 +45,12 @@ class BindingHudiTable:
             storage_options: Optional[Dict[str, str]] = None,
     ): ...
 
-    def schema(self) -> "pyarrow.Schema": ...
+    def get_schema(self) -> "pyarrow.Schema": ...
+
+    def get_file_slices(self) -> List[HudiFileSlice]: ...
+
+    def read_file_slice(self, base_file_relative_path) -> 
List["pyarrow.RecordBatch"]: ...
 
-    def get_latest_file_slices(self) -> List[HudiFileSlice]: ...
+    def read_snapshot(self) -> List["pyarrow.RecordBatch"]: ...
 
-    def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]: 
...
+    def read_snapshot_as_of(self, timestamp: str) -> 
List["pyarrow.RecordBatch"]: ...
diff --git a/python/hudi/table.py b/python/hudi/table.py
index c9cab1b..943f423 100644
--- a/python/hudi/table.py
+++ b/python/hudi/table.py
@@ -36,16 +36,22 @@ class HudiTable:
     ):
         self._table = BindingHudiTable(str(table_uri), storage_options)
 
-    def schema(self) -> "pyarrow.Schema":
-        return self._table.schema()
+    def get_schema(self) -> "pyarrow.Schema":
+        return self._table.get_schema()
 
-    def split_latest_file_slices(self, n) -> Iterator[List[HudiFileSlice]]:
-        file_slices = self.get_latest_file_slices()
+    def split_file_slices(self, n: int) -> Iterator[List[HudiFileSlice]]:
+        file_slices = self.get_file_slices()
         for split in split_list(file_slices, n):
             yield split
 
-    def get_latest_file_slices(self) -> List[HudiFileSlice]:
-        return self._table.get_latest_file_slices()
+    def get_file_slices(self) -> List[HudiFileSlice]:
+        return self._table.get_file_slices()
 
-    def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]:
-        return self._table.read_file_slice(relative_path)
+    def read_file_slice(self, base_file_relative_path: str) -> 
List["pyarrow.RecordBatch"]:
+        return self._table.read_file_slice(base_file_relative_path)
+
+    def read_snapshot(self) -> List["pyarrow.RecordBatch"]:
+        return self._table.read_snapshot()
+
+    def read_snapshot_as_of(self, timestamp: str) -> 
List["pyarrow.RecordBatch"]:
+        return self._table.read_snapshot_as_of(timestamp)
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 2436ee5..10f39ec 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -21,7 +21,7 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::OnceLock;
 
-use anyhow::anyhow;
+use anyhow::Context;
 use arrow::pyarrow::ToPyArrow;
 use pyo3::prelude::*;
 use tokio::runtime::Runtime;
@@ -51,15 +51,15 @@ struct HudiFileSlice {
 #[pymethods]
 impl HudiFileSlice {
     pub fn base_file_relative_path(&self) -> PyResult<String> {
-        let mut p = PathBuf::from(&self.partition_path);
-        p.push(&self.base_file_name);
-        match p.to_str() {
-            Some(s) => Ok(s.to_string()),
-            None => Err(PyErr::from(anyhow!(
+        PathBuf::from(&self.partition_path)
+            .join(&self.base_file_name)
+            .to_str()
+            .map(String::from)
+            .context(format!(
                 "Failed to get base file relative path for file slice: {:?}",
                 self
-            ))),
-        }
+            ))
+            .map_err(PyErr::from)
     }
 }
 
@@ -100,21 +100,30 @@ impl BindingHudiTable {
         Ok(BindingHudiTable { _table })
     }
 
-    pub fn schema(&self, py: Python) -> PyResult<PyObject> {
+    pub fn get_schema(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self._table.get_schema())?.to_pyarrow(py)
     }
 
-    pub fn get_latest_file_slices(&mut self, py: Python) -> 
PyResult<Vec<HudiFileSlice>> {
+    pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
         py.allow_threads(|| {
             let file_slices = rt().block_on(self._table.get_file_slices())?;
             Ok(file_slices.iter().map(convert_file_slice).collect())
         })
     }
 
-    pub fn read_file_slice(&mut self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
+    pub fn read_file_slice(&self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
         rt().block_on(self._table.read_file_slice_by_path(relative_path))?
             .to_pyarrow(py)
     }
+
+    pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
+        rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
+    }
+
+    pub fn read_snapshot_as_of(&self, timestamp: &str, py: Python) -> 
PyResult<PyObject> {
+        rt().block_on(self._table.read_snapshot_as_of(timestamp))?
+            .to_pyarrow(py)
+    }
 }
 
 #[cfg(not(tarpaulin))]
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index f0266f0..aec6d70 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -28,26 +28,52 @@ def test_sample_table(get_sample_table):
     table_path = get_sample_table
     table = HudiTable(table_path, {})
 
-    assert table.schema().names == ['_hoodie_commit_time', 
'_hoodie_commit_seqno', '_hoodie_record_key',
-                                    '_hoodie_partition_path', 
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',
-                                    'fare', 'city']
+    assert table.get_schema().names == ['_hoodie_commit_time', 
'_hoodie_commit_seqno', '_hoodie_record_key',
+                                        '_hoodie_partition_path', 
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',
+                                        'fare', 'city']
 
-    file_slices = table.get_latest_file_slices()
+    file_slices = table.get_file_slices()
     assert len(file_slices) == 5
     assert set(f.commit_time for f in file_slices) == {'20240402123035233', 
'20240402144910683'}
     assert all(f.num_records == 1 for f in file_slices)
     file_slice_paths = [f.base_file_relative_path() for f in file_slices]
     assert set(file_slice_paths) == 
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
-                                
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
-                                
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
-                                
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
-                                
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
+                                     
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
+                                     
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
+                                     
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
+                                     
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
 
     batches = table.read_file_slice(file_slice_paths[0])
     t = pa.Table.from_batches(batches)
     assert t.num_rows == 1
     assert t.num_columns == 11
 
-    file_slices_gen = table.split_latest_file_slices(2)
+    file_slices_gen = table.split_file_slices(2)
     assert len(next(file_slices_gen)) == 3
     assert len(next(file_slices_gen)) == 2
+
+    batches = table.read_snapshot()
+    t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
+    assert t.to_pylist() == [{'_hoodie_commit_time': '20240402144910683', 
'ts': 1695046462179,
+                              'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 
'fare': 339.0},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695091554788,
+                              'uuid': 'e96c4396-3fad-413a-a942-4cb36106d721', 
'fare': 27.7},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695115999911,
+                              'uuid': 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa', 
'fare': 17.85},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695159649087,
+                              'uuid': '334e26e9-8355-45cc-97c6-c31daf0df330', 
'fare': 19.1},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695516137016,
+                              'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c', 
'fare': 34.15}]
+
+    batches = table.read_snapshot_as_of("20240402123035233")
+    t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
+    assert t.to_pylist() == [{'_hoodie_commit_time': '20240402123035233', 
'ts': 1695046462179,
+                              'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 
'fare': 33.9},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695091554788,
+                              'uuid': 'e96c4396-3fad-413a-a942-4cb36106d721', 
'fare': 27.7},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695115999911,
+                              'uuid': 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa', 
'fare': 17.85},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695159649087,
+                              'uuid': '334e26e9-8355-45cc-97c6-c31daf0df330', 
'fare': 19.1},
+                             {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695516137016,
+                              'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c', 
'fare': 34.15}]

Reply via email to