This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push: new 52a9245 refactor: improve error handling in storage module (#34) 52a9245 is described below commit 52a924557ee18effadc02749ec7cdb1001ad6b58 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Tue Jul 2 22:18:26 2024 -0500 refactor: improve error handling in storage module (#34) --- crates/core/fixtures/leaf_dir/.gitkeep | 0 crates/core/src/storage/mod.rs | 198 +++++++++++++++++++++------------ crates/core/src/table/fs_view.rs | 15 +-- crates/core/src/table/mod.rs | 11 +- crates/core/src/table/timeline.rs | 4 +- python/hudi/_internal.pyi | 2 +- python/hudi/table.py | 2 +- python/tests/test_table_read.py | 6 +- 8 files changed, 143 insertions(+), 95 deletions(-) diff --git a/crates/core/fixtures/leaf_dir/.gitkeep b/crates/core/fixtures/leaf_dir/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 0f09c05..43dd0e7 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -21,7 +21,8 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; +use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; use async_recursion::async_recursion; use bytes::Bytes; @@ -60,16 +61,21 @@ impl Storage { } } - #[allow(dead_code)] - pub async fn get_file_info(&self, relative_path: &str) -> FileInfo { - let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); - let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); - let meta = self.object_store.head(&obj_path).await.unwrap(); - FileInfo { - uri: obj_url.to_string(), - name: obj_path.filename().unwrap().to_string(), + #[cfg(test)] + async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> { + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; + let meta = self.object_store.head(&obj_path).await?; + let uri = obj_url.to_string(); + let name = obj_path + .filename() + .ok_or(anyhow!("Failed to get file name for {}", obj_path))? + .to_string(); + Ok(FileInfo { + uri, + name, size: meta.size, - } + }) } pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result<ParquetMetaData> { @@ -79,79 +85,100 @@ impl Storage { let meta = obj_store.head(&obj_path).await?; let reader = ParquetObjectReader::new(obj_store, meta); let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; - Ok(builder.metadata().as_ref().to_owned()) + Ok(builder.metadata().as_ref().clone()) } - pub async fn get_file_data(&self, relative_path: &str) -> Bytes { - let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); - let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); - let result = self.object_store.get(&obj_path).await.unwrap(); - result.bytes().await.unwrap() + pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> { + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; + let result = self.object_store.get(&obj_path).await?; + let bytes = result.bytes().await?; + Ok(bytes) } - pub async fn get_parquet_file_data(&self, relative_path: &str) -> Vec<RecordBatch> { - let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); - let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); + pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result<RecordBatch> { + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; let obj_store = self.object_store.clone(); - let meta = obj_store.head(&obj_path).await.unwrap(); + let meta = obj_store.head(&obj_path).await?; + + // read parquet let reader = ParquetObjectReader::new(obj_store, meta); - let stream = ParquetRecordBatchStreamBuilder::new(reader) - .await - .unwrap() - .build() - .unwrap(); - stream - .collect::<Vec<_>>() - .await - .into_iter() - .map(|r| r.unwrap()) - .collect() + let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + let schema = builder.schema().clone(); + let mut stream = builder.build()?; + let mut batches = Vec::new(); + + while let Some(r) = stream.next().await { + let batch = r.context("Failed to read record batch.")?; + batches.push(batch) + } + + if batches.is_empty() { + return Ok(RecordBatch::new_empty(schema.clone())); + } + + concat_batches(&schema, &batches) + .map_err(|e| anyhow!("Failed to concat record batches: {}", e)) } - pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> { - self.list_dirs_as_obj_paths(subdir) - .await - .into_iter() - .map(|p| p.filename().unwrap().to_string()) - .collect() + pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>> { + let dir_paths = self.list_dirs_as_obj_paths(subdir).await?; + let mut dirs = Vec::new(); + for dir in dir_paths { + dirs.push( + dir.filename() + .ok_or(anyhow!("Failed to get file name for {}", dir))? + .to_string(), + ) + } + Ok(dirs) } - async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> Vec<ObjPath> { - let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()]).unwrap(); - let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap(); - self.object_store + async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> Result<Vec<ObjPath>> { + let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()])?; + let prefix_path = ObjPath::from_url_path(prefix_url.path())?; + let list_res = self + .object_store .list_with_delimiter(Some(&prefix_path)) - .await - .unwrap() - .common_prefixes + .await?; + Ok(list_res.common_prefixes) } - pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> { - let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()]).unwrap(); - let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap(); - self.object_store + pub async fn list_files(&self, subdir: Option<&str>) -> Result<Vec<FileInfo>> { + let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()])?; + let prefix_path = ObjPath::from_url_path(prefix_url.path())?; + let list_res = self + .object_store .list_with_delimiter(Some(&prefix_path)) - .await - .unwrap() - .objects - .into_iter() - .map(|obj_meta| FileInfo { - uri: join_url_segments(&prefix_url, &[obj_meta.location.filename().unwrap()]) - .unwrap() - .to_string(), - name: obj_meta.location.filename().unwrap().to_string(), + .await?; + let mut file_info = Vec::new(); + for obj_meta in list_res.objects { + let name = obj_meta + .location + .filename() + .ok_or(anyhow!( + "Failed to get file name for {:?}", + obj_meta.location + ))? + .to_string(); + let uri = join_url_segments(&prefix_url, &[&name])?.to_string(); + file_info.push(FileInfo { + uri, + name, size: obj_meta.size, - }) - .collect() + }); + } + Ok(file_info) } } #[async_recursion] -pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Vec<String> { +pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result<Vec<String>> { let mut leaf_dirs = Vec::new(); - let child_dirs = storage.list_dirs(subdir).await; + let child_dirs = storage.list_dirs(subdir).await?; if child_dirs.is_empty() { - leaf_dirs.push(subdir.unwrap().to_owned()); + leaf_dirs.push(subdir.unwrap_or_default().to_owned()); } else { for child_dir in child_dirs { let mut next_subdir = PathBuf::new(); @@ -159,11 +186,14 @@ pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Vec<Strin next_subdir.push(curr); } next_subdir.push(child_dir); - let curr_leaf_dir = get_leaf_dirs(storage, Some(next_subdir.to_str().unwrap())).await; + let next_subdir = next_subdir + .to_str() + .ok_or(anyhow!("Failed to convert path: {:?}", next_subdir))?; + let curr_leaf_dir = get_leaf_dirs(storage, Some(next_subdir)).await?; leaf_dirs.extend(curr_leaf_dir); } } - leaf_dirs + Ok(leaf_dirs) } #[cfg(test)] @@ -187,7 +217,8 @@ mod tests { ) .unwrap(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let first_level_dirs: HashSet<String> = storage.list_dirs(None).await.into_iter().collect(); + let first_level_dirs: HashSet<String> = + storage.list_dirs(None).await.unwrap().into_iter().collect(); assert_eq!( first_level_dirs, vec![".hoodie", "part1", "part2", "part3"] @@ -195,9 +226,9 @@ mod tests { .map(String::from) .collect() ); - let second_level_dirs: Vec<String> = storage.list_dirs(Some("part2")).await; + let second_level_dirs: Vec<String> = storage.list_dirs(Some("part2")).await.unwrap(); assert_eq!(second_level_dirs, vec!["part22"]); - let no_dirs = storage.list_dirs(Some("part1")).await; + let no_dirs = storage.list_dirs(Some("part1")).await.unwrap(); assert!(no_dirs.is_empty()); } @@ -211,6 +242,7 @@ mod tests { let first_level_dirs: HashSet<ObjPath> = storage .list_dirs_as_obj_paths(None) .await + .unwrap() .into_iter() .collect(); let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1", "part2", "part3"] @@ -230,7 +262,12 @@ mod tests { ) .unwrap(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let file_info_1: Vec<FileInfo> = storage.list_files(None).await.into_iter().collect(); + let file_info_1: Vec<FileInfo> = storage + .list_files(None) + .await + .unwrap() + .into_iter() + .collect(); assert_eq!( file_info_1, vec![FileInfo { @@ -242,6 +279,7 @@ mod tests { let file_info_2: Vec<FileInfo> = storage .list_files(Some("part1")) .await + .unwrap() .into_iter() .collect(); assert_eq!( @@ -259,6 +297,7 @@ mod tests { let file_info_3: Vec<FileInfo> = storage .list_files(Some("part2/part22")) .await + .unwrap() .into_iter() .collect(); assert_eq!( @@ -282,19 +321,33 @@ mod tests { ) .unwrap(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let leaf_dirs = get_leaf_dirs(&storage, None).await; + let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap(); assert_eq!( leaf_dirs, vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"] ); } + #[tokio::test] + async fn use_storage_to_get_leaf_dirs_for_leaf_dir() { + let base_url = + Url::from_directory_path(canonicalize(Path::new("fixtures/leaf_dir")).unwrap()) + .unwrap(); + let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap(); + assert_eq!( + leaf_dirs, + vec![""], + "Listing a leaf dir should get the relative path to itself." + ); + } + #[tokio::test] async fn storage_get_file_info() { let base_url = Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let file_info = storage.get_file_info("a.parquet").await; + let file_info = storage.get_file_info("a.parquet").await.unwrap(); assert_eq!(file_info.name, "a.parquet"); assert_eq!( file_info.uri, @@ -308,8 +361,7 @@ mod tests { let base_url = Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let file_data = storage.get_parquet_file_data("a.parquet").await; - assert_eq!(file_data.len(), 1); - assert_eq!(file_data.first().unwrap().num_rows(), 5); + let file_data = storage.get_parquet_file_data("a.parquet").await.unwrap(); + assert_eq!(file_data.num_rows(), 5); } } diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index b7cd77c..ad4f5f5 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -58,13 +58,13 @@ impl FileSystemView { async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> { let top_level_dirs: Vec<String> = storage .list_dirs(None) - .await + .await? .into_iter() .filter(|dir| dir != ".hoodie") .collect(); let mut partition_paths = Vec::new(); for dir in top_level_dirs { - partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await); + partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?); } if partition_paths.is_empty() { partition_paths.push("".to_string()) @@ -94,7 +94,7 @@ impl FileSystemView { ) -> Result<Vec<FileGroup>> { let file_info: Vec<FileInfo> = storage .list_files(Some(partition_path)) - .await + .await? .into_iter() .filter(|f| f.name.ends_with(".parquet")) .collect(); @@ -152,13 +152,10 @@ impl FileSystemView { pub async fn read_file_slice_by_path_unchecked( &self, relative_path: &str, - ) -> Result<Vec<RecordBatch>> { - Ok(self.storage.get_parquet_file_data(relative_path).await) + ) -> Result<RecordBatch> { + self.storage.get_parquet_file_data(relative_path).await } - pub async fn read_file_slice_unchecked( - &self, - file_slice: &FileSlice, - ) -> Result<Vec<RecordBatch>> { + pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) -> Result<RecordBatch> { self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path()) .await } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 52daf12..d1be74f 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -85,7 +85,7 @@ impl Table { storage_options: Arc<HashMap<String, String>>, ) -> Result<HashMap<String, String>> { let storage = Storage::new(base_url, storage_options)?; - let data = storage.get_file_data(".hoodie/hoodie.properties").await; + let data = storage.get_file_data(".hoodie/hoodie.properties").await?; let cursor = std::io::Cursor::new(data); let lines = BufReader::new(cursor).lines(); let mut properties: HashMap<String, String> = HashMap::new(); @@ -146,7 +146,7 @@ impl Table { let mut batches = Vec::new(); for f in file_slices { match self.file_system_view.read_file_slice_unchecked(&f).await { - Ok(batch) => batches.extend(batch), + Ok(batch) => batches.push(batch), Err(e) => return Err(anyhow!("Failed to read file slice {:?} - {}", f, e)), } } @@ -162,7 +162,7 @@ impl Table { Ok(file_paths) } - pub async fn read_file_slice_by_path(&self, relative_path: &str) -> Result<Vec<RecordBatch>> { + pub async fn read_file_slice_by_path(&self, relative_path: &str) -> Result<RecordBatch> { self.file_system_view .read_file_slice_by_path_unchecked(relative_path) .await @@ -329,9 +329,8 @@ mod tests { ) .await .unwrap(); - assert_eq!(batches.len(), 1); - assert_eq!(batches.first().unwrap().num_rows(), 4); - assert_eq!(batches.first().unwrap().num_columns(), 21); + assert_eq!(batches.num_rows(), 4); + assert_eq!(batches.num_columns(), 21); } #[tokio::test] diff --git a/crates/core/src/table/timeline.rs b/crates/core/src/table/timeline.rs index 70fc6ee..3a68387 100644 --- a/crates/core/src/table/timeline.rs +++ b/crates/core/src/table/timeline.rs @@ -98,7 +98,7 @@ impl Timeline { async fn load_completed_commit_instants(storage: &Storage) -> Result<Vec<Instant>> { let mut completed_commits = Vec::new(); - for file_info in storage.list_files(Some(".hoodie")).await { + for file_info in storage.list_files(Some(".hoodie")).await? { let (file_stem, file_ext) = split_filename(file_info.name.as_str())?; if file_ext == "commit" { completed_commits.push(Instant { @@ -128,7 +128,7 @@ impl Timeline { "Failed to get commit file path for instant: {:?}", instant ))?; - let bytes = self.storage.get_file_data(relative_path).await; + let bytes = self.storage.get_file_data(relative_path).await?; let json: Value = serde_json::from_slice(&bytes)?; let commit_metadata = json .as_object() diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 67dd0cc..b91b492 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -49,7 +49,7 @@ class BindingHudiTable: def get_file_slices(self) -> List[HudiFileSlice]: ... - def read_file_slice(self, base_file_relative_path) -> List["pyarrow.RecordBatch"]: ... + def read_file_slice(self, base_file_relative_path) -> pyarrow.RecordBatch: ... def read_snapshot(self) -> List["pyarrow.RecordBatch"]: ... diff --git a/python/hudi/table.py b/python/hudi/table.py index 943f423..def024d 100644 --- a/python/hudi/table.py +++ b/python/hudi/table.py @@ -47,7 +47,7 @@ class HudiTable: def get_file_slices(self) -> List[HudiFileSlice]: return self._table.get_file_slices() - def read_file_slice(self, base_file_relative_path: str) -> List["pyarrow.RecordBatch"]: + def read_file_slice(self, base_file_relative_path: str) -> "pyarrow.RecordBatch": return self._table.read_file_slice(base_file_relative_path) def read_snapshot(self) -> List["pyarrow.RecordBatch"]: diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index aec6d70..883958c 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -36,15 +36,15 @@ def test_sample_table(get_sample_table): assert len(file_slices) == 5 assert set(f.commit_time for f in file_slices) == {'20240402123035233', '20240402144910683'} assert all(f.num_records == 1 for f in file_slices) - file_slice_paths = [f.base_file_relative_path() for f in file_slices] + file_slice_paths = [f.base_file_relative_path for f in file_slices] assert set(file_slice_paths) == {'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet', 'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet', 'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet', 'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet', 'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'} - batches = table.read_file_slice(file_slice_paths[0]) - t = pa.Table.from_batches(batches) + batch = table.read_file_slice(file_slice_paths[0]) + t = pa.Table.from_batches([batch]) assert t.num_rows == 1 assert t.num_columns == 11