This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e15174d  feat: implement Rust and Python APIs to read file slices (#28)
e15174d is described below

commit e15174d505ab22407007fdc009adce0d5fd6cb7d
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Wed Jun 26 23:29:02 2024 -0500

    feat: implement Rust and Python APIs to read file slices (#28)
---
 Cargo.toml                                         |   2 +-
 crates/core/Cargo.toml                             |   1 +
 crates/core/src/file_group/mod.rs                  |  65 ++++++---
 crates/core/src/lib.rs                             |   2 +-
 crates/core/src/{lib.rs => storage/file_info.rs}   |  16 +-
 crates/core/src/{lib.rs => storage/file_stats.rs}  |  14 +-
 crates/core/src/storage/mod.rs                     | 145 ++++++++++--------
 .../src/storage/{file_metadata.rs => utils.rs}     |  37 ++---
 crates/core/src/table/fs_view.rs                   |  95 +++++++++---
 crates/core/src/table/mod.rs                       | 162 +++++++++++++++------
 crates/core/src/timeline/mod.rs                    |  38 ++---
 crates/datafusion/src/lib.rs                       |  10 +-
 python/Cargo.toml                                  |  16 +-
 python/hudi/__init__.py                            |   1 +
 python/hudi/_internal.pyi                          |  26 +++-
 python/hudi/{_internal.pyi => _utils.py}           |  17 +--
 python/hudi/table.py                               |  31 +++-
 python/pyproject.toml                              |  15 ++
 python/src/lib.rs                                  |  75 ++++++++--
 python/{hudi => tests}/__init__.py                 |   4 -
 python/{hudi/table.py => tests/conftest.py}        |  26 ++--
 python/tests/test_table_read.py                    |  53 +++++++
 22 files changed, 595 insertions(+), 256 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 28340c9..6d21195 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,7 +30,7 @@ rust-version = "1.75.0"
 
 [workspace.dependencies]
 # arrow
-arrow = { version = "50" }
+arrow = { version = "50", features = ["pyarrow"] }
 arrow-arith = { version = "50" }
 arrow-array = { version = "50", features = ["chrono-tz"] }
 arrow-buffer = { version = "50" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index bdf05bb..98c12ab 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -67,6 +67,7 @@ url = { workspace = true }
 async-recursion = { workspace = true }
 async-trait = { workspace = true }
 tokio = { workspace = true }
+futures = { workspace = true }
 
 # test
 tempfile = "3.10.1"
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index b0d791f..ec2e171 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -21,33 +21,50 @@ use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
 
-use crate::storage::file_metadata::FileMetadata;
+use crate::storage::file_info::FileInfo;
+use crate::storage::file_stats::FileStats;
 use anyhow::{anyhow, Result};
 
 #[derive(Clone, Debug)]
 pub struct BaseFile {
     pub file_group_id: String,
     pub commit_time: String,
-    pub metadata: Option<FileMetadata>,
+    pub info: FileInfo,
+    pub stats: Option<FileStats>,
 }
 
 impl BaseFile {
-    pub fn new(file_name: &str) -> Self {
-        let (name, _) = file_name.rsplit_once('.').unwrap();
+    fn parse_file_name(file_name: &str) -> Result<(String, String)> {
+        let err_msg = format!("Failed to parse file name '{}' for base file.", 
file_name);
+        let (name, _) = 
file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
         let parts: Vec<&str> = name.split('_').collect();
-        let file_group_id = parts[0].to_owned();
-        let commit_time = parts[2].to_owned();
-        Self {
+        let file_group_id = 
parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string();
+        let commit_time = 
parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string();
+        Ok((file_group_id, commit_time))
+    }
+
+    pub fn from_file_name(file_name: &str) -> Result<Self> {
+        let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
+        Ok(Self {
             file_group_id,
             commit_time,
-            metadata: None,
-        }
+            info: FileInfo::default(),
+            stats: None,
+        })
     }
 
-    pub fn from_file_metadata(file_metadata: FileMetadata) -> Self {
-        let mut base_file = Self::new(file_metadata.name.as_str());
-        base_file.metadata = Some(file_metadata);
-        base_file
+    pub fn from_file_info(info: FileInfo) -> Result<Self> {
+        let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
+        Ok(Self {
+            file_group_id,
+            commit_time,
+            info,
+            stats: None,
+        })
+    }
+
+    pub fn populate_stats(&mut self, stats: FileStats) {
+        self.stats = Some(stats)
     }
 }
 
@@ -58,11 +75,8 @@ pub struct FileSlice {
 }
 
 impl FileSlice {
-    pub fn base_file_path(&self) -> Option<&str> {
-        match &self.base_file.metadata {
-            None => None,
-            Some(file_metadata) => Some(file_metadata.path.as_str()),
-        }
+    pub fn base_file_path(&self) -> &str {
+        self.base_file.info.uri.as_str()
     }
 
     pub fn file_group_id(&self) -> &str {
@@ -102,9 +116,9 @@ impl FileGroup {
         }
     }
 
-    #[allow(dead_code)]
-    pub fn add_base_file_from_name(&mut self, file_name: &str) -> 
Result<&Self> {
-        let base_file = BaseFile::new(file_name);
+    #[cfg(test)]
+    fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> {
+        let base_file = BaseFile::from_file_name(file_name)?;
         self.add_base_file(base_file)
     }
 
@@ -131,6 +145,10 @@ impl FileGroup {
     pub fn get_latest_file_slice(&self) -> Option<&FileSlice> {
         return self.file_slices.values().next_back();
     }
+
+    pub fn get_latest_file_slice_mut(&mut self) -> Option<&mut FileSlice> {
+        return self.file_slices.values_mut().next_back();
+    }
 }
 
 #[cfg(test)]
@@ -139,9 +157,10 @@ mod tests {
 
     #[test]
     fn create_a_base_file_successfully() {
-        let base_file = BaseFile::new(
+        let base_file = BaseFile::from_file_name(
             
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
-        );
+        )
+        .unwrap();
         assert_eq!(
             base_file.file_group_id,
             "5a226868-2934-4f84-a16f-55124630c68d-0"
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 0d56755..d2c53ee 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -19,7 +19,7 @@
 
 use crate::table::Table;
 
-mod file_group;
+pub mod file_group;
 pub mod table;
 pub type HudiTable = Table;
 mod storage;
diff --git a/crates/core/src/lib.rs b/crates/core/src/storage/file_info.rs
similarity index 79%
copy from crates/core/src/lib.rs
copy to crates/core/src/storage/file_info.rs
index 0d56755..4bd178d 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/storage/file_info.rs
@@ -17,15 +17,9 @@
  * under the License.
  */
 
-use crate::table::Table;
-
-mod file_group;
-pub mod table;
-pub type HudiTable = Table;
-mod storage;
-pub mod test_utils;
-mod timeline;
-
-pub fn crate_version() -> &'static str {
-    env!("CARGO_PKG_VERSION")
+#[derive(Clone, Debug, Default)]
+pub struct FileInfo {
+    pub uri: String,
+    pub name: String,
+    pub size: usize,
 }
diff --git a/crates/core/src/lib.rs b/crates/core/src/storage/file_stats.rs
similarity index 79%
copy from crates/core/src/lib.rs
copy to crates/core/src/storage/file_stats.rs
index 0d56755..ec63c14 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/storage/file_stats.rs
@@ -17,15 +17,7 @@
  * under the License.
  */
 
-use crate::table::Table;
-
-mod file_group;
-pub mod table;
-pub type HudiTable = Table;
-mod storage;
-pub mod test_utils;
-mod timeline;
-
-pub fn crate_version() -> &'static str {
-    env!("CARGO_PKG_VERSION")
+#[derive(Clone, Debug, Default)]
+pub struct FileStats {
+    pub num_records: i64,
 }
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 3de77a4..c8b7b34 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -21,8 +21,10 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
+use arrow::record_batch::RecordBatch;
 use async_recursion::async_recursion;
 use bytes::Bytes;
+use futures::StreamExt;
 use object_store::path::Path as ObjPath;
 use object_store::{parse_url_opts, ObjectStore};
 use parquet::arrow::async_reader::ParquetObjectReader;
@@ -30,9 +32,12 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
 use parquet::file::metadata::ParquetMetaData;
 use url::Url;
 
-use crate::storage::file_metadata::FileMetadata;
+use crate::storage::file_info::FileInfo;
+use crate::storage::utils::join_url_segments;
 
-pub(crate) mod file_metadata;
+pub(crate) mod file_info;
+pub(crate) mod file_stats;
+pub(crate) mod utils;
 
 #[allow(dead_code)]
 pub struct Storage {
@@ -41,10 +46,8 @@ pub struct Storage {
     options: HashMap<String, String>,
 }
 
-#[allow(dead_code)]
 impl Storage {
-    pub fn new(base_uri: &str, options: HashMap<String, String>) -> 
Box<Storage> {
-        let base_url = 
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
+    pub fn new(base_url: Url, options: HashMap<String, String>) -> 
Box<Storage> {
         let object_store = parse_url_opts(&base_url, &options).unwrap().0;
         Box::from(Storage {
             base_url,
@@ -53,76 +56,88 @@ impl Storage {
         })
     }
 
-    pub async fn get_file_metadata(&self, relative_path: &str) -> FileMetadata 
{
-        let mut obj_url = self.base_url.clone();
-        obj_url.path_segments_mut().unwrap().push(relative_path);
+    #[allow(dead_code)]
+    pub async fn get_file_info(&self, relative_path: &str) -> FileInfo {
+        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
         let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
         let meta = self.object_store.head(&obj_path).await.unwrap();
-        FileMetadata {
-            path: meta.location.to_string(),
+        FileInfo {
+            uri: obj_url.to_string(),
             name: obj_path.filename().unwrap().to_string(),
             size: meta.size,
-            num_records: None,
         }
     }
 
     pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> 
ParquetMetaData {
-        let mut obj_url = self.base_url.clone();
-        obj_url.path_segments_mut().unwrap().push(relative_path);
+        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
         let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
-        let meta = self.object_store.head(&obj_path).await.unwrap();
-        let reader = ParquetObjectReader::new(self.object_store.clone(), meta);
+        let obj_store = self.object_store.clone();
+        let meta = obj_store.head(&obj_path).await.unwrap();
+        let reader = ParquetObjectReader::new(obj_store, meta);
         let builder = 
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
         builder.metadata().as_ref().to_owned()
     }
 
     pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
-        let mut obj_url = self.base_url.clone();
-        obj_url.path_segments_mut().unwrap().push(relative_path);
+        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
         let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
         let result = self.object_store.get(&obj_path).await.unwrap();
         result.bytes().await.unwrap()
     }
 
+    pub async fn get_parquet_file_data(&self, relative_path: &str) -> 
Vec<RecordBatch> {
+        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
+        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+        let obj_store = self.object_store.clone();
+        let meta = obj_store.head(&obj_path).await.unwrap();
+        let reader = ParquetObjectReader::new(obj_store, meta);
+        let stream = ParquetRecordBatchStreamBuilder::new(reader)
+            .await
+            .unwrap()
+            .build()
+            .unwrap();
+        stream
+            .collect::<Vec<_>>()
+            .await
+            .into_iter()
+            .map(|r| r.unwrap())
+            .collect()
+    }
+
     pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
-        self.list_dirs_as_paths(subdir)
+        self.list_dirs_as_obj_paths(subdir)
             .await
             .into_iter()
             .map(|p| p.filename().unwrap().to_string())
             .collect()
     }
 
-    pub async fn list_dirs_as_paths(&self, subdir: Option<&str>) -> 
Vec<ObjPath> {
-        let mut prefix_url = self.base_url.clone();
-        if let Some(subdir) = subdir {
-            prefix_url.path_segments_mut().unwrap().push(subdir);
-        }
-        let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+    async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> 
Vec<ObjPath> {
+        let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()]).unwrap();
+        let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
         self.object_store
-            .list_with_delimiter(Some(&prefix))
+            .list_with_delimiter(Some(&prefix_path))
             .await
             .unwrap()
             .common_prefixes
     }
 
-    pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileMetadata> {
-        let mut prefix_url = self.base_url.clone();
-        if let Some(subdir) = subdir {
-            prefix_url.path_segments_mut().unwrap().push(subdir);
-        }
-        let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+    pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> {
+        let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()]).unwrap();
+        let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
         self.object_store
-            .list_with_delimiter(Some(&prefix))
+            .list_with_delimiter(Some(&prefix_path))
             .await
             .unwrap()
             .objects
             .into_iter()
-            .map(|obj_meta| {
-                FileMetadata::new(
-                    obj_meta.location.to_string(),
-                    obj_meta.location.filename().unwrap().to_string(),
-                    obj_meta.size,
-                )
+            .map(|obj_meta| FileInfo {
+                uri: prefix_url
+                    .join(obj_meta.location.filename().unwrap())
+                    .unwrap()
+                    .to_string(),
+                name: obj_meta.location.filename().unwrap().to_string(),
+                size: obj_meta.size,
             })
             .collect()
     }
@@ -157,6 +172,7 @@ mod tests {
     use object_store::path::Path as ObjPath;
     use url::Url;
 
+    use crate::storage::utils::join_url_segments;
     use crate::storage::{get_leaf_dirs, Storage};
 
     #[tokio::test]
@@ -165,7 +181,7 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url.path(), HashMap::new());
+        let storage = Storage::new(base_url, HashMap::new());
         let first_level_dirs: HashSet<String> = 
storage.list_dirs(None).await.into_iter().collect();
         assert_eq!(
             first_level_dirs,
@@ -186,12 +202,18 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url.path(), HashMap::new());
-        let first_level_dirs: HashSet<ObjPath> =
-            storage.list_dirs_as_paths(None).await.into_iter().collect();
+        let storage = Storage::new(base_url, HashMap::new());
+        let first_level_dirs: HashSet<ObjPath> = storage
+            .list_dirs_as_obj_paths(None)
+            .await
+            .into_iter()
+            .collect();
         let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1", 
"part2", "part3"]
             .into_iter()
-            .map(|dir| 
ObjPath::from_url_path(base_url.join(dir).unwrap().path()).unwrap())
+            .map(|dir| {
+                ObjPath::from_url_path(join_url_segments(&storage.base_url, 
&[dir]).unwrap().path())
+                    .unwrap()
+            })
             .collect();
         assert_eq!(first_level_dirs, expected_paths);
     }
@@ -202,26 +224,26 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url.path(), HashMap::new());
+        let storage = Storage::new(base_url, HashMap::new());
         let file_names_1: Vec<String> = storage
             .list_files(None)
             .await
             .into_iter()
-            .map(|file_metadata| file_metadata.name)
+            .map(|file_info| file_info.name)
             .collect();
         assert_eq!(file_names_1, vec!["a.parquet"]);
         let file_names_2: Vec<String> = storage
             .list_files(Some("part1"))
             .await
             .into_iter()
-            .map(|file_metadata| file_metadata.name)
+            .map(|file_info| file_info.name)
             .collect();
         assert_eq!(file_names_2, vec!["b.parquet"]);
         let file_names_3: Vec<String> = storage
             .list_files(Some("part2/part22"))
             .await
             .into_iter()
-            .map(|file_metadata| file_metadata.name)
+            .map(|file_info| file_info.name)
             .collect();
         assert_eq!(file_names_3, vec!["c.parquet"]);
     }
@@ -232,7 +254,7 @@ mod tests {
             canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(base_url.path(), HashMap::new());
+        let storage = Storage::new(base_url, HashMap::new());
         let leaf_dirs = get_leaf_dirs(&storage, None).await;
         assert_eq!(
             leaf_dirs,
@@ -241,19 +263,26 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn get_file_metadata() {
+    async fn storage_get_file_info() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
-        let storage = Storage::new(base_url.path(), HashMap::new());
-        let file_metadata = storage.get_file_metadata("a.parquet").await;
-        assert_eq!(file_metadata.name, "a.parquet");
+        let storage = Storage::new(base_url, HashMap::new());
+        let file_info = storage.get_file_info("a.parquet").await;
+        assert_eq!(file_info.name, "a.parquet");
         assert_eq!(
-            file_metadata.path,
-            ObjPath::from_url_path(base_url.join("a.parquet").unwrap().path())
-                .unwrap()
-                .to_string()
+            file_info.uri,
+            storage.base_url.join("a.parquet").unwrap().to_string()
         );
-        assert_eq!(file_metadata.size, 866);
-        assert_eq!(file_metadata.num_records, None);
+        assert_eq!(file_info.size, 866);
+    }
+
+    #[tokio::test]
+    async fn storage_get_parquet_file_data() {
+        let base_url =
+            
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
+        let storage = Storage::new(base_url, HashMap::new());
+        let file_data = storage.get_parquet_file_data("a.parquet").await;
+        assert_eq!(file_data.len(), 1);
+        assert_eq!(file_data.first().unwrap().num_rows(), 5);
     }
 }
diff --git a/crates/core/src/storage/file_metadata.rs 
b/crates/core/src/storage/utils.rs
similarity index 74%
rename from crates/core/src/storage/file_metadata.rs
rename to crates/core/src/storage/utils.rs
index a7b8f28..cf81dc0 100644
--- a/crates/core/src/storage/file_metadata.rs
+++ b/crates/core/src/storage/utils.rs
@@ -17,28 +17,9 @@
  * under the License.
  */
 
-use anyhow::anyhow;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
 use std::path::Path;
-
-#[derive(Clone, Debug, Default, Eq, PartialEq)]
-pub struct FileMetadata {
-    pub path: String,
-    pub name: String,
-    pub size: usize,
-    pub num_records: Option<usize>,
-}
-
-impl FileMetadata {
-    pub fn new(path: String, name: String, size: usize) -> FileMetadata {
-        FileMetadata {
-            path,
-            name,
-            size,
-            num_records: None,
-        }
-    }
-}
+use url::{ParseError, Url};
 
 pub fn split_filename(filename: &str) -> Result<(String, String)> {
     let path = Path::new(filename);
@@ -57,3 +38,17 @@ pub fn split_filename(filename: &str) -> Result<(String, 
String)> {
 
     Ok((stem, extension))
 }
+
+pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result<Url> {
+    let mut url = base_url.clone();
+
+    if url.path().ends_with('/') {
+        url.path_segments_mut().unwrap().pop();
+    }
+
+    url.path_segments_mut()
+        .map_err(|_| ParseError::RelativeUrlWithoutBase)?
+        .extend(segments);
+
+    Ok(url)
+}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 2f3b981..5f9cf0f 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -18,30 +18,33 @@
  */
 
 use std::collections::HashMap;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
 
 use anyhow::{anyhow, Result};
+use arrow::record_batch::RecordBatch;
+use url::Url;
 
 use crate::file_group::{BaseFile, FileGroup, FileSlice};
-use crate::storage::file_metadata::FileMetadata;
+use crate::storage::file_info::FileInfo;
+use crate::storage::file_stats::FileStats;
 use crate::storage::{get_leaf_dirs, Storage};
 
 #[derive(Clone, Debug)]
 pub struct FileSystemView {
-    pub base_path: PathBuf,
+    pub base_url: Url,
     partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
 }
 
 impl FileSystemView {
-    pub fn new(base_path: &Path) -> Self {
+    pub fn new(base_url: Url) -> Self {
         FileSystemView {
-            base_path: base_path.to_path_buf(),
+            base_url,
             partition_to_file_groups: HashMap::new(),
         }
     }
 
     async fn get_partition_paths(&self) -> Result<Vec<String>> {
-        let storage = Storage::new(self.base_path.to_str().unwrap(), 
HashMap::new());
+        let storage = Storage::new(self.base_url.clone(), HashMap::new());
         let top_level_dirs: Vec<String> = storage
             .list_dirs(None)
             .await
@@ -56,16 +59,16 @@ impl FileSystemView {
     }
 
     async fn get_file_groups(&self, partition_path: &str) -> 
Result<Vec<FileGroup>> {
-        let storage = Storage::new(self.base_path.to_str().unwrap(), 
HashMap::new());
-        let file_metadata: Vec<FileMetadata> = storage
+        let storage = Storage::new(self.base_url.clone(), HashMap::new());
+        let file_info: Vec<FileInfo> = storage
             .list_files(Some(partition_path))
             .await
             .into_iter()
             .filter(|f| f.name.ends_with(".parquet"))
             .collect();
         let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
-        for f in file_metadata {
-            let base_file = BaseFile::from_file_metadata(f);
+        for f in file_info {
+            let base_file = BaseFile::from_file_info(f)?;
             let fg_id = &base_file.file_group_id;
             fg_id_to_base_files
                 .entry(fg_id.to_owned())
@@ -84,8 +87,7 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub fn get_latest_file_slices(&mut self) -> Vec<&FileSlice> {
-        let mut file_slices = Vec::new();
+    pub fn load_file_groups(&mut self) {
         let fs_view = self.clone();
         let rt = tokio::runtime::Builder::new_current_thread()
             .enable_all()
@@ -96,6 +98,10 @@ impl FileSystemView {
         for (k, v) in result {
             self.partition_to_file_groups.insert(k, v);
         }
+    }
+
+    pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+        let mut file_slices = Vec::new();
         for fgs in self.partition_to_file_groups.values() {
             for fg in fgs {
                 if let Some(file_slice) = fg.get_latest_file_slice() {
@@ -105,6 +111,52 @@ impl FileSystemView {
         }
         file_slices
     }
+
+    pub fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut FileSlice> 
{
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let mut file_slices = Vec::new();
+        let file_groups = &mut self.partition_to_file_groups.values_mut();
+        for fgs in file_groups {
+            for fg in fgs {
+                if let Some(file_slice) = fg.get_latest_file_slice_mut() {
+                    let wrapper = async { 
load_file_slice_stats(&self.base_url, file_slice).await };
+                    let _ = rt.block_on(wrapper);
+                    file_slices.push(file_slice)
+                }
+            }
+        }
+        file_slices
+    }
+
+    pub fn read_file_slice(&self, relative_path: &str) -> Vec<RecordBatch> {
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let storage = Storage::new(self.base_url.clone(), HashMap::new());
+        let wrapper = async { 
storage.get_parquet_file_data(relative_path).await };
+        rt.block_on(wrapper)
+    }
+}
+
+async fn load_file_slice_stats(base_url: &Url, file_slice: &mut FileSlice) -> 
Result<()> {
+    let base_file = &mut file_slice.base_file;
+    if base_file.stats.is_none() {
+        let storage = Storage::new(base_url.clone(), HashMap::new());
+        let ptn = file_slice.partition_path.clone();
+        let mut relative_path = PathBuf::from(ptn.unwrap_or("".to_string()));
+        let base_file_name = &base_file.info.name;
+        relative_path.push(base_file_name);
+        let parquet_meta = storage
+            .get_parquet_file_metadata(relative_path.to_str().unwrap())
+            .await;
+        let num_records = parquet_meta.file_metadata().num_rows();
+        base_file.populate_stats(FileStats { num_records });
+    }
+    Ok(())
 }
 
 async fn get_partitions_and_file_groups(
@@ -133,17 +185,20 @@ async fn get_partitions_and_file_groups(
 #[cfg(test)]
 mod tests {
     use std::collections::HashSet;
+    use std::fs::canonicalize;
     use std::path::Path;
 
-    use crate::test_utils::extract_test_table;
+    use url::Url;
 
     use crate::table::fs_view::FileSystemView;
+    use crate::test_utils::extract_test_table;
 
     #[tokio::test]
     async fn get_partition_paths() {
-        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-        let target_table_path = extract_test_table(fixture_path);
-        let fs_view = FileSystemView::new(&target_table_path);
+        let fixture_path =
+            
canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap();
+        let base_url = 
Url::from_file_path(extract_test_table(&fixture_path)).unwrap();
+        let fs_view = FileSystemView::new(base_url);
         let partition_paths = fs_view.get_partition_paths().await.unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
@@ -155,9 +210,11 @@ mod tests {
 
     #[test]
     fn get_latest_file_slices() {
-        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-        let target_table_path = extract_test_table(fixture_path);
-        let mut fs_view = FileSystemView::new(&target_table_path);
+        let fixture_path =
+            
canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap();
+        let base_url = 
Url::from_file_path(extract_test_table(&fixture_path)).unwrap();
+        let mut fs_view = FileSystemView::new(base_url);
+        fs_view.load_file_groups();
         let file_slices = fs_view.get_latest_file_slices();
         assert_eq!(file_slices.len(), 5);
         let mut fg_ids = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 9191aa4..681ef05 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -17,16 +17,18 @@
  * under the License.
  */
 
-use anyhow::Result;
 use std::collections::HashMap;
-use std::fs::File;
 use std::io::{BufRead, BufReader};
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
 use std::str::FromStr;
 
+use anyhow::Result;
+use arrow::record_batch::RecordBatch;
 use arrow_schema::SchemaRef;
+use url::Url;
 
 use crate::file_group::FileSlice;
+use crate::storage::Storage;
 use crate::table::config::BaseFileFormat;
 use crate::table::config::{ConfigKey, TableType};
 use crate::table::fs_view::FileSystemView;
@@ -38,26 +40,44 @@ mod fs_view;
 mod metadata;
 
 #[derive(Debug, Clone)]
+#[allow(dead_code)]
 pub struct Table {
-    pub base_path: PathBuf,
+    pub base_url: Url,
     pub props: HashMap<String, String>,
+    pub file_system_view: Option<FileSystemView>,
+    pub storage_options: HashMap<String, String>,
 }
 
 impl Table {
-    pub fn new(table_base_path: &str) -> Self {
-        let base_path = PathBuf::from(table_base_path);
-        let props_path = base_path.join(".hoodie").join("hoodie.properties");
-        match Self::load_properties(props_path.as_path()) {
-            Ok(props) => Self { base_path, props },
+    pub fn new(base_uri: &str, storage_options: HashMap<String, String>) -> 
Self {
+        let base_url = 
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
+        match Self::load_properties(&base_url, ".hoodie/hoodie.properties", 
&storage_options) {
+            Ok(props) => Self {
+                base_url,
+                props,
+                file_system_view: None,
+                storage_options,
+            },
             Err(e) => {
                 panic!("Failed to load table properties: {}", e)
             }
         }
     }
 
-    fn load_properties(path: &Path) -> Result<HashMap<String, String>> {
-        let file = File::open(path)?;
-        let reader = BufReader::new(file);
+    fn load_properties(
+        base_url: &Url,
+        props_path: &str,
+        storage_options: &HashMap<String, String>,
+    ) -> Result<HashMap<String, String>> {
+        let rt = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+        let storage = Storage::new(base_url.clone(), storage_options.clone());
+        let get_data = async { storage.get_file_data(props_path).await };
+        let data = rt.block_on(get_data);
+        let cursor = std::io::Cursor::new(data);
+        let reader = BufReader::new(cursor);
         let lines = reader.lines();
         let mut properties: HashMap<String, String> = HashMap::new();
         for line in lines {
@@ -81,58 +101,70 @@ impl Table {
         }
     }
 
-    pub fn get_timeline(&self) -> Result<Timeline> {
+    #[cfg(test)]
+    fn get_timeline(&self) -> Result<Timeline> {
         let rt = tokio::runtime::Builder::new_current_thread()
             .enable_all()
             .build()
             .unwrap();
-        let f = async { Timeline::new(self.base_path.to_str().unwrap()).await 
};
-        rt.block_on(f)
+        let init_timeline = async { Timeline::new(self.base_url.clone()).await 
};
+        rt.block_on(init_timeline)
     }
 
-    pub fn schema(&self) -> SchemaRef {
+    pub fn get_latest_schema(&self) -> SchemaRef {
         let rt = tokio::runtime::Builder::new_current_thread()
             .enable_all()
             .build()
             .unwrap();
-        let f = async { Timeline::new(self.base_path.to_str().unwrap()).await 
};
-        let timeline = rt.block_on(f);
+        let init_timeline = async { Timeline::new(self.base_url.clone()).await 
};
+        let timeline = rt.block_on(init_timeline);
         match timeline {
             Ok(timeline) => {
                 let rt = tokio::runtime::Builder::new_current_thread()
                     .enable_all()
                     .build()
                     .unwrap();
-                let wrapper = async { timeline.get_latest_schema().await };
-                let result = rt.block_on(wrapper);
-                match result {
+                let get_schema = async { timeline.get_latest_schema().await };
+                match rt.block_on(get_schema) {
                     Ok(schema) => SchemaRef::from(schema),
-                    Err(e) => {
-                        panic!("Failed to resolve table schema: {}", e)
-                    }
+                    Err(e) => panic!("Failed to resolve table schema: {}", e),
                 }
             }
-            Err(e) => {
-                panic!("Failed to resolve table schema: {}", e)
-            }
+            Err(e) => panic!("Failed to resolve table schema: {}", e),
         }
     }
 
-    pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> {
+    pub fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
+        if self.file_system_view.is_none() {
+            let mut new_fs_view = FileSystemView::new(self.base_url.clone());
+            new_fs_view.load_file_groups();
+            self.file_system_view = Some(new_fs_view);
+        }
+
+        let fs_view = self.file_system_view.as_mut().unwrap();
+
         let mut file_slices = Vec::new();
-        let mut fs_view = FileSystemView::new(self.base_path.as_path());
-        for f in fs_view.get_latest_file_slices() {
+        for f in fs_view.get_latest_file_slices_with_stats() {
             file_slices.push(f.clone());
         }
         Ok(file_slices)
     }
 
-    pub fn get_latest_file_paths(&self) -> Result<Vec<String>> {
+    pub fn read_file_slice(&mut self, relative_path: &str) -> Vec<RecordBatch> 
{
+        if self.file_system_view.is_none() {
+            let mut new_fs_view = FileSystemView::new(self.base_url.clone());
+            new_fs_view.load_file_groups();
+            self.file_system_view = Some(new_fs_view);
+        }
+
+        let fs_view = self.file_system_view.as_ref().unwrap();
+        fs_view.read_file_slice(relative_path)
+    }
+
+    pub fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
         let mut file_paths = Vec::new();
         for f in self.get_latest_file_slices()? {
-            if let Some(f) = f.base_file_path() {
-                file_paths.push(f.to_string());
-            }
+            file_paths.push(f.base_file_path().to_string());
         }
         Ok(file_paths)
     }
@@ -178,7 +210,7 @@ impl ProvidesTableMetadata for Table {
     }
 
     fn location(&self) -> String {
-        self.base_path.to_str().unwrap().to_string()
+        self.base_url.path().to_string()
     }
 
     fn partition_fields(&self) -> Vec<String> {
@@ -223,7 +255,10 @@ impl ProvidesTableMetadata for Table {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
+    use std::fs::canonicalize;
     use std::path::Path;
+    use url::Url;
 
     use crate::table::config::BaseFileFormat::Parquet;
     use crate::table::config::TableType::CopyOnWrite;
@@ -231,20 +266,62 @@ mod tests {
     use crate::table::Table;
     use crate::test_utils::extract_test_table;
 
+    #[test]
+    fn hudi_table_get_latest_schema() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let base_url = 
Url::from_file_path(extract_test_table(fixture_path)).unwrap();
+        let hudi_table = Table::new(base_url.path(), HashMap::new());
+        let fields: Vec<String> = hudi_table
+            .get_latest_schema()
+            .all_fields()
+            .into_iter()
+            .map(|f| f.name().to_string())
+            .collect();
+        assert_eq!(
+            fields,
+            Vec::from([
+                "_hoodie_commit_time",
+                "_hoodie_commit_seqno",
+                "_hoodie_record_key",
+                "_hoodie_partition_path",
+                "_hoodie_file_name",
+                "ts",
+                "uuid",
+                "rider",
+                "driver",
+                "fare",
+                "city"
+            ])
+        );
+    }
+
+    #[test]
+    fn hudi_table_read_file_slice() {
+        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+        let base_url = 
Url::from_file_path(extract_test_table(fixture_path)).unwrap();
+        let mut hudi_table = Table::new(base_url.path(), HashMap::new());
+        let batches = hudi_table.read_file_slice(
+            
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet",
+        );
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches.first().unwrap().num_rows(), 1);
+        assert_eq!(batches.first().unwrap().num_columns(), 11);
+    }
+
     #[test]
     fn hudi_table_get_latest_file_paths() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-        let target_table_path = extract_test_table(fixture_path);
-        let hudi_table = Table::new(target_table_path.to_str().unwrap());
+        let base_url = 
Url::from_file_path(extract_test_table(fixture_path)).unwrap();
+        let mut hudi_table = Table::new(base_url.path(), HashMap::new());
         assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
         assert_eq!(hudi_table.get_latest_file_paths().unwrap().len(), 5);
-        println!("{}", hudi_table.schema());
     }
 
     #[test]
     fn hudi_table_get_table_metadata() {
-        let fixture_path = 
Path::new("fixtures/table_metadata/sample_table_properties");
-        let table = Table::new(fixture_path.to_str().unwrap());
+        let base_path =
+            
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
+        let table = Table::new(base_path.to_str().unwrap(), HashMap::new());
         assert_eq!(table.base_file_format(), Parquet);
         assert_eq!(table.checksum(), 3761586722);
         assert_eq!(table.database_name(), "default");
@@ -256,10 +333,7 @@ mod tests {
             table.key_generator_class(),
             "org.apache.hudi.keygen.SimpleKeyGenerator"
         );
-        assert_eq!(
-            table.location(),
-            "fixtures/table_metadata/sample_table_properties"
-        );
+        assert_eq!(table.location(), base_path.to_str().unwrap());
         assert_eq!(table.partition_fields(), vec!["city"]);
         assert_eq!(table.precombine_field(), "ts");
         assert!(table.populates_meta_fields());
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 3ac0a70..e7f8010 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -25,8 +25,9 @@ use anyhow::{anyhow, Result};
 use arrow_schema::SchemaRef;
 use parquet::arrow::parquet_to_arrow_schema;
 use serde_json::{Map, Value};
+use url::Url;
 
-use crate::storage::file_metadata::split_filename;
+use crate::storage::utils::split_filename;
 use crate::storage::Storage;
 
 #[allow(dead_code)]
@@ -60,24 +61,21 @@ impl Instant {
 
 #[derive(Debug, Clone)]
 pub struct Timeline {
-    pub base_path: String,
+    pub base_url: Url,
     pub instants: Vec<Instant>,
 }
 
 impl Timeline {
-    pub async fn new(base_path: &str) -> Result<Self> {
-        let instants = Self::load_completed_commit_instants(base_path).await?;
-        Ok(Self {
-            base_path: base_path.to_string(),
-            instants,
-        })
+    pub async fn new(base_url: Url) -> Result<Self> {
+        let instants = Self::load_completed_commit_instants(&base_url).await?;
+        Ok(Self { base_url, instants })
     }
 
-    async fn load_completed_commit_instants(base_path: &str) -> 
Result<Vec<Instant>> {
-        let storage = Storage::new(base_path, HashMap::new());
+    async fn load_completed_commit_instants(base_url: &Url) -> 
Result<Vec<Instant>> {
+        let storage = Storage::new(base_url.clone(), HashMap::new());
         let mut completed_commits = Vec::new();
-        for file_metadata in storage.list_files(Some(".hoodie")).await {
-            let (file_stem, file_ext) = 
split_filename(file_metadata.name.as_str())?;
+        for file_info in storage.list_files(Some(".hoodie")).await {
+            let (file_stem, file_ext) = 
split_filename(file_info.name.as_str())?;
             if file_ext == "commit" {
                 completed_commits.push(Instant {
                     state: State::Completed,
@@ -96,7 +94,7 @@ impl Timeline {
             Some(instant) => {
                 let mut commit_file_path = PathBuf::from(".hoodie");
                 commit_file_path.push(instant.file_name());
-                let storage = Storage::new(&self.base_path, HashMap::new());
+                let storage = Storage::new(self.base_url.clone(), 
HashMap::new());
                 let bytes = storage
                     .get_file_data(commit_file_path.to_str().unwrap())
                     .await;
@@ -118,7 +116,7 @@ impl Timeline {
             if let Some((_, value)) = partition_to_write_stats.iter().next() {
                 if let Some(first_value) = value.as_array().and_then(|arr| 
arr.first()) {
                     if let Some(path) = first_value["path"].as_str() {
-                        let storage = Storage::new(&self.base_path, 
HashMap::new());
+                        let storage = Storage::new(self.base_url.clone(), 
HashMap::new());
                         let parquet_meta = 
storage.get_parquet_file_metadata(path).await;
                         let arrow_schema = parquet_to_arrow_schema(
                             parquet_meta.file_metadata().schema_descr(),
@@ -138,6 +136,8 @@ mod tests {
     use std::fs::canonicalize;
     use std::path::Path;
 
+    use url::Url;
+
     use crate::test_utils::extract_test_table;
     use crate::timeline::{Instant, State, Timeline};
 
@@ -145,16 +145,18 @@ mod tests {
     async fn read_latest_schema() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let base_path = canonicalize(target_table_path).unwrap();
-        let timeline = 
Timeline::new(base_path.to_str().unwrap()).await.unwrap();
+        let base_url = 
Url::from_file_path(canonicalize(target_table_path).unwrap()).unwrap();
+        let timeline = Timeline::new(base_url).await.unwrap();
         let table_schema = timeline.get_latest_schema().await.unwrap();
         assert_eq!(table_schema.fields.len(), 11)
     }
 
     #[tokio::test]
     async fn init_commits_timeline() {
-        let base_path = 
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap();
-        let timeline = 
Timeline::new(base_path.to_str().unwrap()).await.unwrap();
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap())
+                .unwrap();
+        let timeline = Timeline::new(base_url).await.unwrap();
         assert_eq!(
             timeline.instants,
             vec![
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 56aae9d..7025a8c 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -19,6 +19,7 @@
 
 use arrow_array::RecordBatch;
 use std::any::Any;
+use std::collections::HashMap;
 use std::fmt::Debug;
 use std::fs::File;
 use std::sync::Arc;
@@ -45,7 +46,7 @@ pub struct HudiDataSource {
 impl HudiDataSource {
     pub fn new(base_path: &str) -> Self {
         Self {
-            table: HudiTable::new(base_path),
+            table: HudiTable::new(base_path, HashMap::new()),
         }
     }
     pub(crate) async fn create_physical_plan(
@@ -56,7 +57,7 @@ impl HudiDataSource {
         Ok(Arc::new(HudiExec::new(projections, schema, self.clone())))
     }
 
-    fn get_record_batches(&self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
+    fn get_record_batches(&mut self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
         match self.table.get_latest_file_paths() {
             Ok(file_paths) => {
                 let mut record_batches = Vec::new();
@@ -84,7 +85,7 @@ impl TableProvider for HudiDataSource {
     }
 
     fn schema(&self) -> SchemaRef {
-        self.table.schema()
+        self.table.get_latest_schema()
     }
 
     fn table_type(&self) -> TableType {
@@ -161,7 +162,8 @@ impl ExecutionPlan for HudiExec {
         _partition: usize,
         _context: Arc<TaskContext>,
     ) -> datafusion_common::Result<SendableRecordBatchStream> {
-        let data = self.data_source.get_record_batches()?;
+        let mut data_source = self.data_source.clone();
+        let data = data_source.get_record_batches()?;
         Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
     }
 }
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 6c5702b..613b010 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -27,10 +27,22 @@ doc = false
 
 [dependencies]
 object_store = { workspace = true }
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
 
 [dependencies.pyo3]
-version = "0.21.2"
-features = ["extension-module", "abi3", "abi3-py38", "gil-refs"]
+version = "0.20.3"
+features = ["extension-module", "abi3", "abi3-py38"]
 
 [dependencies.hudi]
 path = "../crates/hudi"
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index fa70485..8054368 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -17,4 +17,5 @@
 
 from ._internal import __version__ as __version__
 from ._internal import rust_core_version as rust_core_version
+from ._internal import HudiFileSlice as HudiFileSlice
 from .table import HudiTable as HudiTable
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 8759c2e..b34c25a 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -15,7 +15,9 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from typing import List
+from typing import List, Dict, Optional
+
+import pyarrow
 
 __version__: str
 
@@ -23,8 +25,26 @@ __version__: str
 def rust_core_version() -> str: ...
 
 
+class HudiFileSlice:
+    file_group_id: str
+    partition_path: str
+    commit_time: str
+    base_file_name: str
+    base_file_path: str
+    base_file_size: int
+    num_records: int
+
+
 class BindingHudiTable:
 
-    def __init__(self, table_uri: str): ...
+    def __init__(
+            self,
+            table_uri: str,
+            storage_options: Optional[Dict[str, str]] = None,
+    ): ...
+
+    def schema(self) -> "pyarrow.Schema": ...
+
+    def get_latest_file_slices(self) -> List[HudiFileSlice]: ...
 
-    def get_latest_file_paths(self) -> List[str]: ...
+    def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]: 
...
diff --git a/python/hudi/_internal.pyi b/python/hudi/_utils.py
similarity index 78%
copy from python/hudi/_internal.pyi
copy to python/hudi/_utils.py
index 8759c2e..779a14b 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_utils.py
@@ -14,17 +14,10 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
+from typing import List, Any, Iterator
 
-from typing import List
 
-__version__: str
-
-
-def rust_core_version() -> str: ...
-
-
-class BindingHudiTable:
-
-    def __init__(self, table_uri: str): ...
-
-    def get_latest_file_paths(self) -> List[str]: ...
+def split_list(lst: List[Any], n: int) -> Iterator[List[Any]]:
+    split_size = (len(lst) + n - 1) // n
+    for i in range(0, len(lst), split_size):
+        yield lst[i: i + split_size]
diff --git a/python/hudi/table.py b/python/hudi/table.py
index a8cfbcb..c9cab1b 100644
--- a/python/hudi/table.py
+++ b/python/hudi/table.py
@@ -15,18 +15,37 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
+import os
 from dataclasses import dataclass
 from pathlib import Path
-from typing import Union, List
+from typing import Union, List, Iterator, Optional, Dict
 
-from python.hudi._internal import BindingHudiTable
+import pyarrow
+
+from hudi._internal import BindingHudiTable, HudiFileSlice
+from hudi._utils import split_list
 
 
 @dataclass(init=False)
 class HudiTable:
 
-    def __init__(self, table_uri: Union[str, Path, ""]):
-        self._table = BindingHudiTable(str(table_uri))
+    def __init__(
+            self,
+            table_uri: Union[str, Path, "os.PathLike[str]"],
+            storage_options: Optional[Dict[str, str]] = None,
+    ):
+        self._table = BindingHudiTable(str(table_uri), storage_options)
+
+    def schema(self) -> "pyarrow.Schema":
+        return self._table.schema()
+
+    def split_latest_file_slices(self, n) -> Iterator[List[HudiFileSlice]]:
+        file_slices = self.get_latest_file_slices()
+        for split in split_list(file_slices, n):
+            yield split
+
+    def get_latest_file_slices(self) -> List[HudiFileSlice]:
+        return self._table.get_latest_file_slices()
 
-    def get_latest_file_paths(self) -> List[str]:
-        return self._table.get_latest_file_paths()
+    def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]:
+        return self._table.read_file_slice(relative_path)
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 1abc6c1..175773c 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -22,6 +22,7 @@ build-backend = "maturin"
 [project]
 name = "hudi"
 description = "Native Hudi Python binding based on hudi-rs"
+urls = { repository = "https://github.com/apache/hudi-rs/tree/main/python/"; }
 readme = "README.md"
 requires-python = ">=3.8"
 license = "Apache License 2.0"
@@ -37,8 +38,22 @@ dependencies = [
     "pyarrow>=8"
 ]
 
+optional-dependencies = { devel = [
+    "pytest"
+] }
+
 dynamic = ["version"]
 
 [tool.maturin]
 module-name = "hudi._internal"
 features = ["pyo3/extension-module"]
+
+[tool.mypy]
+files = "hudi/*.py"
+exclude = "^tests"
+
+[tool.pytest.ini_options]
+testpaths = [
+    "tests",
+    "hudi",
+]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 8ac33f4..07c81b0 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -17,35 +17,91 @@
  * under the License.
  */
 
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use arrow::pyarrow::ToPyArrow;
 use pyo3::prelude::*;
 
-use hudi::table::Table;
+use hudi::file_group::FileSlice;
+use hudi::HudiTable;
+
+#[pyclass]
+struct HudiFileSlice {
+    #[pyo3(get)]
+    file_group_id: String,
+    #[pyo3(get)]
+    partition_path: String,
+    #[pyo3(get)]
+    commit_time: String,
+    #[pyo3(get)]
+    base_file_name: String,
+    #[pyo3(get)]
+    base_file_path: String,
+    #[pyo3(get)]
+    base_file_size: usize,
+    #[pyo3(get)]
+    num_records: i64,
+}
+
+impl HudiFileSlice {
+    pub fn from_file_slice(f: FileSlice) -> Self {
+        let partition_path = 
f.partition_path.clone().unwrap_or("".to_string());
+        let mut p = PathBuf::from(&partition_path);
+        p.push(f.base_file.info.name.clone());
+        let base_file_path = p.to_str().unwrap().to_string();
+        Self {
+            file_group_id: f.file_group_id().to_string(),
+            partition_path,
+            commit_time: f.base_file.commit_time,
+            base_file_name: f.base_file.info.name,
+            base_file_path,
+            base_file_size: f.base_file.info.size,
+            num_records: f.base_file.stats.unwrap().num_records,
+        }
+    }
+}
 
 #[pyclass]
 struct BindingHudiTable {
-    _table: hudi::HudiTable,
+    _table: HudiTable,
 }
 
 #[pymethods]
 impl BindingHudiTable {
     #[new]
-    #[pyo3(signature = (table_uri))]
-    fn new(py: Python, table_uri: &str) -> PyResult<Self> {
+    #[pyo3(signature = (table_uri, storage_options = None))]
+    fn new(
+        py: Python,
+        table_uri: &str,
+        storage_options: Option<HashMap<String, String>>,
+    ) -> PyResult<Self> {
         py.allow_threads(|| {
             Ok(BindingHudiTable {
-                _table: Table::new(table_uri),
+                _table: HudiTable::new(table_uri, 
storage_options.unwrap_or_default()),
             })
         })
     }
 
-    pub fn get_latest_file_paths(&self) -> PyResult<Vec<String>> {
-        match self._table.get_latest_file_paths() {
-            Ok(paths) => Ok(paths),
+    pub fn schema(&self, py: Python) -> PyResult<PyObject> {
+        self._table.get_latest_schema().to_pyarrow(py)
+    }
+
+    pub fn get_latest_file_slices(&mut self) -> PyResult<Vec<HudiFileSlice>> {
+        match self._table.get_latest_file_slices() {
+            Ok(file_slices) => Ok(file_slices
+                .into_iter()
+                .map(HudiFileSlice::from_file_slice)
+                .collect()),
             Err(_e) => {
-                panic!("Failed to retrieve the latest file paths.")
+                panic!("Failed to retrieve the latest file slices.")
             }
         }
     }
+
+    pub fn read_file_slice(&mut self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
+        self._table.read_file_slice(relative_path).to_pyarrow(py)
+    }
 }
 
 #[pyfunction]
@@ -58,6 +114,7 @@ fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
     m.add("__version__", env!("CARGO_PKG_VERSION"))?;
     m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
 
+    m.add_class::<HudiFileSlice>()?;
     m.add_class::<BindingHudiTable>()?;
     Ok(())
 }
diff --git a/python/hudi/__init__.py b/python/tests/__init__.py
similarity index 83%
copy from python/hudi/__init__.py
copy to python/tests/__init__.py
index fa70485..a67d5ea 100644
--- a/python/hudi/__init__.py
+++ b/python/tests/__init__.py
@@ -14,7 +14,3 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
-
-from ._internal import __version__ as __version__
-from ._internal import rust_core_version as rust_core_version
-from .table import HudiTable as HudiTable
diff --git a/python/hudi/table.py b/python/tests/conftest.py
similarity index 58%
copy from python/hudi/table.py
copy to python/tests/conftest.py
index a8cfbcb..2dcfdeb 100644
--- a/python/hudi/table.py
+++ b/python/tests/conftest.py
@@ -15,18 +15,26 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from dataclasses import dataclass
+import os
+import zipfile
 from pathlib import Path
-from typing import Union, List
 
-from python.hudi._internal import BindingHudiTable
+import pytest
 
 
-@dataclass(init=False)
-class HudiTable:
+def _extract_testing_table(zip_file_path, target_path) -> str:
+    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
+        zip_ref.extractall(target_path)
+    return os.path.join(target_path, "trips_table")
 
-    def __init__(self, table_uri: Union[str, Path, ""]):
-        self._table = BindingHudiTable(str(table_uri))
 
-    def get_latest_file_paths(self) -> List[str]:
-        return self._table.get_latest_file_paths()
+@pytest.fixture(
+    params=[
+        "0.x_cow_partitioned",
+    ]
+)
+def get_sample_table(request, tmp_path) -> str:
+    fixture_path = "../crates/core/fixtures/table"
+    table_name = request.param
+    zip_file_path = Path(fixture_path).joinpath(f"{table_name}.zip")
+    return _extract_testing_table(zip_file_path, tmp_path)
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
new file mode 100644
index 0000000..20ce42c
--- /dev/null
+++ b/python/tests/test_table_read.py
@@ -0,0 +1,53 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import pyarrow as pa
+import pytest
+
+from hudi import HudiTable
+
+PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if 
s.isnumeric()) < (8, 0, 0)
+pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="hudi only supported 
if pyarrow >= 8.0.0")
+
+
+def test_sample_table(get_sample_table):
+    table_path = get_sample_table
+    table = HudiTable(table_path, {})
+
+    assert table.schema().names == ['_hoodie_commit_time', 
'_hoodie_commit_seqno', '_hoodie_record_key',
+                                    '_hoodie_partition_path', 
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',
+                                    'fare', 'city']
+
+    file_slices = table.get_latest_file_slices()
+    assert len(file_slices) == 5
+    assert set(f.commit_time for f in file_slices) == {'20240402123035233', 
'20240402144910683'}
+    assert all(f.num_records == 1 for f in file_slices)
+    file_slice_paths = [f.base_file_path for f in file_slices]
+    assert set(file_slice_paths) == 
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
+                                
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
+                                
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
+                                
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
+                                
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
+
+    batches = table.read_file_slice(file_slice_paths[0])
+    t = pa.Table.from_batches(batches)
+    assert t.num_rows == 1
+    assert t.num_columns == 11
+
+    file_slices_gen = table.split_latest_file_slices(2)
+    assert len(next(file_slices_gen)) == 3
+    assert len(next(file_slices_gen)) == 2

Reply via email to