This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 52a9245  refactor: improve error handling in storage module (#34)
52a9245 is described below

commit 52a924557ee18effadc02749ec7cdb1001ad6b58
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Tue Jul 2 22:18:26 2024 -0500

    refactor: improve error handling in storage module (#34)
---
 crates/core/fixtures/leaf_dir/.gitkeep |   0
 crates/core/src/storage/mod.rs         | 198 +++++++++++++++++++++------------
 crates/core/src/table/fs_view.rs       |  15 +--
 crates/core/src/table/mod.rs           |  11 +-
 crates/core/src/table/timeline.rs      |   4 +-
 python/hudi/_internal.pyi              |   2 +-
 python/hudi/table.py                   |   2 +-
 python/tests/test_table_read.py        |   6 +-
 8 files changed, 143 insertions(+), 95 deletions(-)

diff --git a/crates/core/fixtures/leaf_dir/.gitkeep 
b/crates/core/fixtures/leaf_dir/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 0f09c05..43dd0e7 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -21,7 +21,8 @@ use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::Arc;
 
-use anyhow::{anyhow, Result};
+use anyhow::{anyhow, Context, Result};
+use arrow::compute::concat_batches;
 use arrow::record_batch::RecordBatch;
 use async_recursion::async_recursion;
 use bytes::Bytes;
@@ -60,16 +61,21 @@ impl Storage {
         }
     }
 
-    #[allow(dead_code)]
-    pub async fn get_file_info(&self, relative_path: &str) -> FileInfo {
-        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
-        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
-        let meta = self.object_store.head(&obj_path).await.unwrap();
-        FileInfo {
-            uri: obj_url.to_string(),
-            name: obj_path.filename().unwrap().to_string(),
+    #[cfg(test)]
+    async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
+        let meta = self.object_store.head(&obj_path).await?;
+        let uri = obj_url.to_string();
+        let name = obj_path
+            .filename()
+            .ok_or(anyhow!("Failed to get file name for {}", obj_path))?
+            .to_string();
+        Ok(FileInfo {
+            uri,
+            name,
             size: meta.size,
-        }
+        })
     }
 
     pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> 
Result<ParquetMetaData> {
@@ -79,79 +85,100 @@ impl Storage {
         let meta = obj_store.head(&obj_path).await?;
         let reader = ParquetObjectReader::new(obj_store, meta);
         let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
-        Ok(builder.metadata().as_ref().to_owned())
+        Ok(builder.metadata().as_ref().clone())
     }
 
-    pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
-        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
-        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
-        let result = self.object_store.get(&obj_path).await.unwrap();
-        result.bytes().await.unwrap()
+    pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
+        let result = self.object_store.get(&obj_path).await?;
+        let bytes = result.bytes().await?;
+        Ok(bytes)
     }
 
-    pub async fn get_parquet_file_data(&self, relative_path: &str) -> 
Vec<RecordBatch> {
-        let obj_url = join_url_segments(&self.base_url, 
&[relative_path]).unwrap();
-        let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+    pub async fn get_parquet_file_data(&self, relative_path: &str) -> 
Result<RecordBatch> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
         let obj_store = self.object_store.clone();
-        let meta = obj_store.head(&obj_path).await.unwrap();
+        let meta = obj_store.head(&obj_path).await?;
+
+        // read parquet
         let reader = ParquetObjectReader::new(obj_store, meta);
-        let stream = ParquetRecordBatchStreamBuilder::new(reader)
-            .await
-            .unwrap()
-            .build()
-            .unwrap();
-        stream
-            .collect::<Vec<_>>()
-            .await
-            .into_iter()
-            .map(|r| r.unwrap())
-            .collect()
+        let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+        let schema = builder.schema().clone();
+        let mut stream = builder.build()?;
+        let mut batches = Vec::new();
+
+        while let Some(r) = stream.next().await {
+            let batch = r.context("Failed to read record batch.")?;
+            batches.push(batch)
+        }
+
+        if batches.is_empty() {
+            return Ok(RecordBatch::new_empty(schema.clone()));
+        }
+
+        concat_batches(&schema, &batches)
+            .map_err(|e| anyhow!("Failed to concat record batches: {}", e))
     }
 
-    pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
-        self.list_dirs_as_obj_paths(subdir)
-            .await
-            .into_iter()
-            .map(|p| p.filename().unwrap().to_string())
-            .collect()
+    pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>> 
{
+        let dir_paths = self.list_dirs_as_obj_paths(subdir).await?;
+        let mut dirs = Vec::new();
+        for dir in dir_paths {
+            dirs.push(
+                dir.filename()
+                    .ok_or(anyhow!("Failed to get file name for {}", dir))?
+                    .to_string(),
+            )
+        }
+        Ok(dirs)
     }
 
-    async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> 
Vec<ObjPath> {
-        let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()]).unwrap();
-        let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
-        self.object_store
+    async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> 
Result<Vec<ObjPath>> {
+        let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()])?;
+        let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
+        let list_res = self
+            .object_store
             .list_with_delimiter(Some(&prefix_path))
-            .await
-            .unwrap()
-            .common_prefixes
+            .await?;
+        Ok(list_res.common_prefixes)
     }
 
-    pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> {
-        let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()]).unwrap();
-        let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
-        self.object_store
+    pub async fn list_files(&self, subdir: Option<&str>) -> 
Result<Vec<FileInfo>> {
+        let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()])?;
+        let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
+        let list_res = self
+            .object_store
             .list_with_delimiter(Some(&prefix_path))
-            .await
-            .unwrap()
-            .objects
-            .into_iter()
-            .map(|obj_meta| FileInfo {
-                uri: join_url_segments(&prefix_url, 
&[obj_meta.location.filename().unwrap()])
-                    .unwrap()
-                    .to_string(),
-                name: obj_meta.location.filename().unwrap().to_string(),
+            .await?;
+        let mut file_info = Vec::new();
+        for obj_meta in list_res.objects {
+            let name = obj_meta
+                .location
+                .filename()
+                .ok_or(anyhow!(
+                    "Failed to get file name for {:?}",
+                    obj_meta.location
+                ))?
+                .to_string();
+            let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
+            file_info.push(FileInfo {
+                uri,
+                name,
                 size: obj_meta.size,
-            })
-            .collect()
+            });
+        }
+        Ok(file_info)
     }
 }
 
 #[async_recursion]
-pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> 
Vec<String> {
+pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> 
Result<Vec<String>> {
     let mut leaf_dirs = Vec::new();
-    let child_dirs = storage.list_dirs(subdir).await;
+    let child_dirs = storage.list_dirs(subdir).await?;
     if child_dirs.is_empty() {
-        leaf_dirs.push(subdir.unwrap().to_owned());
+        leaf_dirs.push(subdir.unwrap_or_default().to_owned());
     } else {
         for child_dir in child_dirs {
             let mut next_subdir = PathBuf::new();
@@ -159,11 +186,14 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: 
Option<&str>) -> Vec<Strin
                 next_subdir.push(curr);
             }
             next_subdir.push(child_dir);
-            let curr_leaf_dir = get_leaf_dirs(storage, 
Some(next_subdir.to_str().unwrap())).await;
+            let next_subdir = next_subdir
+                .to_str()
+                .ok_or(anyhow!("Failed to convert path: {:?}", next_subdir))?;
+            let curr_leaf_dir = get_leaf_dirs(storage, 
Some(next_subdir)).await?;
             leaf_dirs.extend(curr_leaf_dir);
         }
     }
-    leaf_dirs
+    Ok(leaf_dirs)
 }
 
 #[cfg(test)]
@@ -187,7 +217,8 @@ mod tests {
         )
         .unwrap();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let first_level_dirs: HashSet<String> = 
storage.list_dirs(None).await.into_iter().collect();
+        let first_level_dirs: HashSet<String> =
+            storage.list_dirs(None).await.unwrap().into_iter().collect();
         assert_eq!(
             first_level_dirs,
             vec![".hoodie", "part1", "part2", "part3"]
@@ -195,9 +226,9 @@ mod tests {
                 .map(String::from)
                 .collect()
         );
-        let second_level_dirs: Vec<String> = 
storage.list_dirs(Some("part2")).await;
+        let second_level_dirs: Vec<String> = 
storage.list_dirs(Some("part2")).await.unwrap();
         assert_eq!(second_level_dirs, vec!["part22"]);
-        let no_dirs = storage.list_dirs(Some("part1")).await;
+        let no_dirs = storage.list_dirs(Some("part1")).await.unwrap();
         assert!(no_dirs.is_empty());
     }
 
@@ -211,6 +242,7 @@ mod tests {
         let first_level_dirs: HashSet<ObjPath> = storage
             .list_dirs_as_obj_paths(None)
             .await
+            .unwrap()
             .into_iter()
             .collect();
         let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1", 
"part2", "part3"]
@@ -230,7 +262,12 @@ mod tests {
         )
         .unwrap();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let file_info_1: Vec<FileInfo> = 
storage.list_files(None).await.into_iter().collect();
+        let file_info_1: Vec<FileInfo> = storage
+            .list_files(None)
+            .await
+            .unwrap()
+            .into_iter()
+            .collect();
         assert_eq!(
             file_info_1,
             vec![FileInfo {
@@ -242,6 +279,7 @@ mod tests {
         let file_info_2: Vec<FileInfo> = storage
             .list_files(Some("part1"))
             .await
+            .unwrap()
             .into_iter()
             .collect();
         assert_eq!(
@@ -259,6 +297,7 @@ mod tests {
         let file_info_3: Vec<FileInfo> = storage
             .list_files(Some("part2/part22"))
             .await
+            .unwrap()
             .into_iter()
             .collect();
         assert_eq!(
@@ -282,19 +321,33 @@ mod tests {
         )
         .unwrap();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let leaf_dirs = get_leaf_dirs(&storage, None).await;
+        let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
         assert_eq!(
             leaf_dirs,
             vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"]
         );
     }
 
+    #[tokio::test]
+    async fn use_storage_to_get_leaf_dirs_for_leaf_dir() {
+        let base_url =
+            
Url::from_directory_path(canonicalize(Path::new("fixtures/leaf_dir")).unwrap())
+                .unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
+        assert_eq!(
+            leaf_dirs,
+            vec![""],
+            "Listing a leaf dir should get the relative path to itself."
+        );
+    }
+
     #[tokio::test]
     async fn storage_get_file_info() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let file_info = storage.get_file_info("a.parquet").await;
+        let file_info = storage.get_file_info("a.parquet").await.unwrap();
         assert_eq!(file_info.name, "a.parquet");
         assert_eq!(
             file_info.uri,
@@ -308,8 +361,7 @@ mod tests {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
         let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
-        let file_data = storage.get_parquet_file_data("a.parquet").await;
-        assert_eq!(file_data.len(), 1);
-        assert_eq!(file_data.first().unwrap().num_rows(), 5);
+        let file_data = 
storage.get_parquet_file_data("a.parquet").await.unwrap();
+        assert_eq!(file_data.num_rows(), 5);
     }
 }
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index b7cd77c..ad4f5f5 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -58,13 +58,13 @@ impl FileSystemView {
     async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
         let top_level_dirs: Vec<String> = storage
             .list_dirs(None)
-            .await
+            .await?
             .into_iter()
             .filter(|dir| dir != ".hoodie")
             .collect();
         let mut partition_paths = Vec::new();
         for dir in top_level_dirs {
-            partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await);
+            partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?);
         }
         if partition_paths.is_empty() {
             partition_paths.push("".to_string())
@@ -94,7 +94,7 @@ impl FileSystemView {
     ) -> Result<Vec<FileGroup>> {
         let file_info: Vec<FileInfo> = storage
             .list_files(Some(partition_path))
-            .await
+            .await?
             .into_iter()
             .filter(|f| f.name.ends_with(".parquet"))
             .collect();
@@ -152,13 +152,10 @@ impl FileSystemView {
     pub async fn read_file_slice_by_path_unchecked(
         &self,
         relative_path: &str,
-    ) -> Result<Vec<RecordBatch>> {
-        Ok(self.storage.get_parquet_file_data(relative_path).await)
+    ) -> Result<RecordBatch> {
+        self.storage.get_parquet_file_data(relative_path).await
     }
-    pub async fn read_file_slice_unchecked(
-        &self,
-        file_slice: &FileSlice,
-    ) -> Result<Vec<RecordBatch>> {
+    pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
         
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
             .await
     }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 52daf12..d1be74f 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -85,7 +85,7 @@ impl Table {
         storage_options: Arc<HashMap<String, String>>,
     ) -> Result<HashMap<String, String>> {
         let storage = Storage::new(base_url, storage_options)?;
-        let data = storage.get_file_data(".hoodie/hoodie.properties").await;
+        let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
         let cursor = std::io::Cursor::new(data);
         let lines = BufReader::new(cursor).lines();
         let mut properties: HashMap<String, String> = HashMap::new();
@@ -146,7 +146,7 @@ impl Table {
         let mut batches = Vec::new();
         for f in file_slices {
             match self.file_system_view.read_file_slice_unchecked(&f).await {
-                Ok(batch) => batches.extend(batch),
+                Ok(batch) => batches.push(batch),
                 Err(e) => return Err(anyhow!("Failed to read file slice {:?} - 
{}", f, e)),
             }
         }
@@ -162,7 +162,7 @@ impl Table {
         Ok(file_paths)
     }
 
-    pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<Vec<RecordBatch>> {
+    pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<RecordBatch> {
         self.file_system_view
             .read_file_slice_by_path_unchecked(relative_path)
             .await
@@ -329,9 +329,8 @@ mod tests {
             )
             .await
             .unwrap();
-        assert_eq!(batches.len(), 1);
-        assert_eq!(batches.first().unwrap().num_rows(), 4);
-        assert_eq!(batches.first().unwrap().num_columns(), 21);
+        assert_eq!(batches.num_rows(), 4);
+        assert_eq!(batches.num_columns(), 21);
     }
 
     #[tokio::test]
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index 70fc6ee..3a68387 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -98,7 +98,7 @@ impl Timeline {
 
     async fn load_completed_commit_instants(storage: &Storage) -> 
Result<Vec<Instant>> {
         let mut completed_commits = Vec::new();
-        for file_info in storage.list_files(Some(".hoodie")).await {
+        for file_info in storage.list_files(Some(".hoodie")).await? {
             let (file_stem, file_ext) = 
split_filename(file_info.name.as_str())?;
             if file_ext == "commit" {
                 completed_commits.push(Instant {
@@ -128,7 +128,7 @@ impl Timeline {
                     "Failed to get commit file path for instant: {:?}",
                     instant
                 ))?;
-                let bytes = self.storage.get_file_data(relative_path).await;
+                let bytes = self.storage.get_file_data(relative_path).await?;
                 let json: Value = serde_json::from_slice(&bytes)?;
                 let commit_metadata = json
                     .as_object()
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 67dd0cc..b91b492 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -49,7 +49,7 @@ class BindingHudiTable:
 
     def get_file_slices(self) -> List[HudiFileSlice]: ...
 
-    def read_file_slice(self, base_file_relative_path) -> 
List["pyarrow.RecordBatch"]: ...
+    def read_file_slice(self, base_file_relative_path) -> pyarrow.RecordBatch: 
...
 
     def read_snapshot(self) -> List["pyarrow.RecordBatch"]: ...
 
diff --git a/python/hudi/table.py b/python/hudi/table.py
index 943f423..def024d 100644
--- a/python/hudi/table.py
+++ b/python/hudi/table.py
@@ -47,7 +47,7 @@ class HudiTable:
     def get_file_slices(self) -> List[HudiFileSlice]:
         return self._table.get_file_slices()
 
-    def read_file_slice(self, base_file_relative_path: str) -> 
List["pyarrow.RecordBatch"]:
+    def read_file_slice(self, base_file_relative_path: str) -> 
"pyarrow.RecordBatch":
         return self._table.read_file_slice(base_file_relative_path)
 
     def read_snapshot(self) -> List["pyarrow.RecordBatch"]:
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index aec6d70..883958c 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -36,15 +36,15 @@ def test_sample_table(get_sample_table):
     assert len(file_slices) == 5
     assert set(f.commit_time for f in file_slices) == {'20240402123035233', 
'20240402144910683'}
     assert all(f.num_records == 1 for f in file_slices)
-    file_slice_paths = [f.base_file_relative_path() for f in file_slices]
+    file_slice_paths = [f.base_file_relative_path for f in file_slices]
     assert set(file_slice_paths) == 
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
                                      
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
                                      
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
                                      
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
                                      
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
 
-    batches = table.read_file_slice(file_slice_paths[0])
-    t = pa.Table.from_batches(batches)
+    batch = table.read_file_slice(file_slice_paths[0])
+    t = pa.Table.from_batches([batch])
     assert t.num_rows == 1
     assert t.num_columns == 11
 

Reply via email to