This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push: new e15174d feat: implement Rust and Python APIs to read file slices (#28) e15174d is described below commit e15174d505ab22407007fdc009adce0d5fd6cb7d Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Wed Jun 26 23:29:02 2024 -0500 feat: implement Rust and Python APIs to read file slices (#28) --- Cargo.toml | 2 +- crates/core/Cargo.toml | 1 + crates/core/src/file_group/mod.rs | 65 ++++++--- crates/core/src/lib.rs | 2 +- crates/core/src/{lib.rs => storage/file_info.rs} | 16 +- crates/core/src/{lib.rs => storage/file_stats.rs} | 14 +- crates/core/src/storage/mod.rs | 145 ++++++++++-------- .../src/storage/{file_metadata.rs => utils.rs} | 37 ++--- crates/core/src/table/fs_view.rs | 95 +++++++++--- crates/core/src/table/mod.rs | 162 +++++++++++++++------ crates/core/src/timeline/mod.rs | 38 ++--- crates/datafusion/src/lib.rs | 10 +- python/Cargo.toml | 16 +- python/hudi/__init__.py | 1 + python/hudi/_internal.pyi | 26 +++- python/hudi/{_internal.pyi => _utils.py} | 17 +-- python/hudi/table.py | 31 +++- python/pyproject.toml | 15 ++ python/src/lib.rs | 75 ++++++++-- python/{hudi => tests}/__init__.py | 4 - python/{hudi/table.py => tests/conftest.py} | 26 ++-- python/tests/test_table_read.py | 53 +++++++ 22 files changed, 595 insertions(+), 256 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 28340c9..6d21195 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ rust-version = "1.75.0" [workspace.dependencies] # arrow -arrow = { version = "50" } +arrow = { version = "50", features = ["pyarrow"] } arrow-arith = { version = "50" } arrow-array = { version = "50", features = ["chrono-tz"] } arrow-buffer = { version = "50" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index bdf05bb..98c12ab 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -67,6 +67,7 @@ url = { workspace = true } async-recursion = { workspace = true } async-trait = { workspace = true } tokio = { workspace = true } +futures = { workspace = true } # test tempfile = "3.10.1" diff --git a/crates/core/src/file_group/mod.rs b/crates/core/src/file_group/mod.rs index b0d791f..ec2e171 100644 --- a/crates/core/src/file_group/mod.rs +++ b/crates/core/src/file_group/mod.rs @@ -21,33 +21,50 @@ use std::collections::BTreeMap; use std::fmt; use std::fmt::Formatter; -use crate::storage::file_metadata::FileMetadata; +use crate::storage::file_info::FileInfo; +use crate::storage::file_stats::FileStats; use anyhow::{anyhow, Result}; #[derive(Clone, Debug)] pub struct BaseFile { pub file_group_id: String, pub commit_time: String, - pub metadata: Option<FileMetadata>, + pub info: FileInfo, + pub stats: Option<FileStats>, } impl BaseFile { - pub fn new(file_name: &str) -> Self { - let (name, _) = file_name.rsplit_once('.').unwrap(); + fn parse_file_name(file_name: &str) -> Result<(String, String)> { + let err_msg = format!("Failed to parse file name '{}' for base file.", file_name); + let (name, _) = file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?; let parts: Vec<&str> = name.split('_').collect(); - let file_group_id = parts[0].to_owned(); - let commit_time = parts[2].to_owned(); - Self { + let file_group_id = parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string(); + let commit_time = parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string(); + Ok((file_group_id, commit_time)) + } + + pub fn from_file_name(file_name: &str) -> Result<Self> { + let (file_group_id, commit_time) = Self::parse_file_name(file_name)?; + Ok(Self { file_group_id, commit_time, - metadata: None, - } + info: FileInfo::default(), + stats: None, + }) } - pub fn from_file_metadata(file_metadata: FileMetadata) -> Self { - let mut base_file = Self::new(file_metadata.name.as_str()); - base_file.metadata = Some(file_metadata); - base_file + pub fn from_file_info(info: FileInfo) -> Result<Self> { + let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?; + Ok(Self { + file_group_id, + commit_time, + info, + stats: None, + }) + } + + pub fn populate_stats(&mut self, stats: FileStats) { + self.stats = Some(stats) } } @@ -58,11 +75,8 @@ pub struct FileSlice { } impl FileSlice { - pub fn base_file_path(&self) -> Option<&str> { - match &self.base_file.metadata { - None => None, - Some(file_metadata) => Some(file_metadata.path.as_str()), - } + pub fn base_file_path(&self) -> &str { + self.base_file.info.uri.as_str() } pub fn file_group_id(&self) -> &str { @@ -102,9 +116,9 @@ impl FileGroup { } } - #[allow(dead_code)] - pub fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> { - let base_file = BaseFile::new(file_name); + #[cfg(test)] + fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> { + let base_file = BaseFile::from_file_name(file_name)?; self.add_base_file(base_file) } @@ -131,6 +145,10 @@ impl FileGroup { pub fn get_latest_file_slice(&self) -> Option<&FileSlice> { return self.file_slices.values().next_back(); } + + pub fn get_latest_file_slice_mut(&mut self) -> Option<&mut FileSlice> { + return self.file_slices.values_mut().next_back(); + } } #[cfg(test)] @@ -139,9 +157,10 @@ mod tests { #[test] fn create_a_base_file_successfully() { - let base_file = BaseFile::new( + let base_file = BaseFile::from_file_name( "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet", - ); + ) + .unwrap(); assert_eq!( base_file.file_group_id, "5a226868-2934-4f84-a16f-55124630c68d-0" diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 0d56755..d2c53ee 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -19,7 +19,7 @@ use crate::table::Table; -mod file_group; +pub mod file_group; pub mod table; pub type HudiTable = Table; mod storage; diff --git a/crates/core/src/lib.rs b/crates/core/src/storage/file_info.rs similarity index 79% copy from crates/core/src/lib.rs copy to crates/core/src/storage/file_info.rs index 0d56755..4bd178d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/storage/file_info.rs @@ -17,15 +17,9 @@ * under the License. */ -use crate::table::Table; - -mod file_group; -pub mod table; -pub type HudiTable = Table; -mod storage; -pub mod test_utils; -mod timeline; - -pub fn crate_version() -> &'static str { - env!("CARGO_PKG_VERSION") +#[derive(Clone, Debug, Default)] +pub struct FileInfo { + pub uri: String, + pub name: String, + pub size: usize, } diff --git a/crates/core/src/lib.rs b/crates/core/src/storage/file_stats.rs similarity index 79% copy from crates/core/src/lib.rs copy to crates/core/src/storage/file_stats.rs index 0d56755..ec63c14 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/storage/file_stats.rs @@ -17,15 +17,7 @@ * under the License. */ -use crate::table::Table; - -mod file_group; -pub mod table; -pub type HudiTable = Table; -mod storage; -pub mod test_utils; -mod timeline; - -pub fn crate_version() -> &'static str { - env!("CARGO_PKG_VERSION") +#[derive(Clone, Debug, Default)] +pub struct FileStats { + pub num_records: i64, } diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 3de77a4..c8b7b34 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -21,8 +21,10 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use arrow::record_batch::RecordBatch; use async_recursion::async_recursion; use bytes::Bytes; +use futures::StreamExt; use object_store::path::Path as ObjPath; use object_store::{parse_url_opts, ObjectStore}; use parquet::arrow::async_reader::ParquetObjectReader; @@ -30,9 +32,12 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder; use parquet::file::metadata::ParquetMetaData; use url::Url; -use crate::storage::file_metadata::FileMetadata; +use crate::storage::file_info::FileInfo; +use crate::storage::utils::join_url_segments; -pub(crate) mod file_metadata; +pub(crate) mod file_info; +pub(crate) mod file_stats; +pub(crate) mod utils; #[allow(dead_code)] pub struct Storage { @@ -41,10 +46,8 @@ pub struct Storage { options: HashMap<String, String>, } -#[allow(dead_code)] impl Storage { - pub fn new(base_uri: &str, options: HashMap<String, String>) -> Box<Storage> { - let base_url = Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap(); + pub fn new(base_url: Url, options: HashMap<String, String>) -> Box<Storage> { let object_store = parse_url_opts(&base_url, &options).unwrap().0; Box::from(Storage { base_url, @@ -53,76 +56,88 @@ impl Storage { }) } - pub async fn get_file_metadata(&self, relative_path: &str) -> FileMetadata { - let mut obj_url = self.base_url.clone(); - obj_url.path_segments_mut().unwrap().push(relative_path); + #[allow(dead_code)] + pub async fn get_file_info(&self, relative_path: &str) -> FileInfo { + let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); let meta = self.object_store.head(&obj_path).await.unwrap(); - FileMetadata { - path: meta.location.to_string(), + FileInfo { + uri: obj_url.to_string(), name: obj_path.filename().unwrap().to_string(), size: meta.size, - num_records: None, } } pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> ParquetMetaData { - let mut obj_url = self.base_url.clone(); - obj_url.path_segments_mut().unwrap().push(relative_path); + let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); - let meta = self.object_store.head(&obj_path).await.unwrap(); - let reader = ParquetObjectReader::new(self.object_store.clone(), meta); + let obj_store = self.object_store.clone(); + let meta = obj_store.head(&obj_path).await.unwrap(); + let reader = ParquetObjectReader::new(obj_store, meta); let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); builder.metadata().as_ref().to_owned() } pub async fn get_file_data(&self, relative_path: &str) -> Bytes { - let mut obj_url = self.base_url.clone(); - obj_url.path_segments_mut().unwrap().push(relative_path); + let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); let result = self.object_store.get(&obj_path).await.unwrap(); result.bytes().await.unwrap() } + pub async fn get_parquet_file_data(&self, relative_path: &str) -> Vec<RecordBatch> { + let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); + let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); + let obj_store = self.object_store.clone(); + let meta = obj_store.head(&obj_path).await.unwrap(); + let reader = ParquetObjectReader::new(obj_store, meta); + let stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .build() + .unwrap(); + stream + .collect::<Vec<_>>() + .await + .into_iter() + .map(|r| r.unwrap()) + .collect() + } + pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> { - self.list_dirs_as_paths(subdir) + self.list_dirs_as_obj_paths(subdir) .await .into_iter() .map(|p| p.filename().unwrap().to_string()) .collect() } - pub async fn list_dirs_as_paths(&self, subdir: Option<&str>) -> Vec<ObjPath> { - let mut prefix_url = self.base_url.clone(); - if let Some(subdir) = subdir { - prefix_url.path_segments_mut().unwrap().push(subdir); - } - let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap(); + async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> Vec<ObjPath> { + let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()]).unwrap(); + let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap(); self.object_store - .list_with_delimiter(Some(&prefix)) + .list_with_delimiter(Some(&prefix_path)) .await .unwrap() .common_prefixes } - pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileMetadata> { - let mut prefix_url = self.base_url.clone(); - if let Some(subdir) = subdir { - prefix_url.path_segments_mut().unwrap().push(subdir); - } - let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap(); + pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> { + let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()]).unwrap(); + let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap(); self.object_store - .list_with_delimiter(Some(&prefix)) + .list_with_delimiter(Some(&prefix_path)) .await .unwrap() .objects .into_iter() - .map(|obj_meta| { - FileMetadata::new( - obj_meta.location.to_string(), - obj_meta.location.filename().unwrap().to_string(), - obj_meta.size, - ) + .map(|obj_meta| FileInfo { + uri: prefix_url + .join(obj_meta.location.filename().unwrap()) + .unwrap() + .to_string(), + name: obj_meta.location.filename().unwrap().to_string(), + size: obj_meta.size, }) .collect() } @@ -157,6 +172,7 @@ mod tests { use object_store::path::Path as ObjPath; use url::Url; + use crate::storage::utils::join_url_segments; use crate::storage::{get_leaf_dirs, Storage}; #[tokio::test] @@ -165,7 +181,7 @@ mod tests { canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(base_url.path(), HashMap::new()); + let storage = Storage::new(base_url, HashMap::new()); let first_level_dirs: HashSet<String> = storage.list_dirs(None).await.into_iter().collect(); assert_eq!( first_level_dirs, @@ -186,12 +202,18 @@ mod tests { canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(base_url.path(), HashMap::new()); - let first_level_dirs: HashSet<ObjPath> = - storage.list_dirs_as_paths(None).await.into_iter().collect(); + let storage = Storage::new(base_url, HashMap::new()); + let first_level_dirs: HashSet<ObjPath> = storage + .list_dirs_as_obj_paths(None) + .await + .into_iter() + .collect(); let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1", "part2", "part3"] .into_iter() - .map(|dir| ObjPath::from_url_path(base_url.join(dir).unwrap().path()).unwrap()) + .map(|dir| { + ObjPath::from_url_path(join_url_segments(&storage.base_url, &[dir]).unwrap().path()) + .unwrap() + }) .collect(); assert_eq!(first_level_dirs, expected_paths); } @@ -202,26 +224,26 @@ mod tests { canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(base_url.path(), HashMap::new()); + let storage = Storage::new(base_url, HashMap::new()); let file_names_1: Vec<String> = storage .list_files(None) .await .into_iter() - .map(|file_metadata| file_metadata.name) + .map(|file_info| file_info.name) .collect(); assert_eq!(file_names_1, vec!["a.parquet"]); let file_names_2: Vec<String> = storage .list_files(Some("part1")) .await .into_iter() - .map(|file_metadata| file_metadata.name) + .map(|file_info| file_info.name) .collect(); assert_eq!(file_names_2, vec!["b.parquet"]); let file_names_3: Vec<String> = storage .list_files(Some("part2/part22")) .await .into_iter() - .map(|file_metadata| file_metadata.name) + .map(|file_info| file_info.name) .collect(); assert_eq!(file_names_3, vec!["c.parquet"]); } @@ -232,7 +254,7 @@ mod tests { canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(base_url.path(), HashMap::new()); + let storage = Storage::new(base_url, HashMap::new()); let leaf_dirs = get_leaf_dirs(&storage, None).await; assert_eq!( leaf_dirs, @@ -241,19 +263,26 @@ mod tests { } #[tokio::test] - async fn get_file_metadata() { + async fn storage_get_file_info() { let base_url = Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap(); - let storage = Storage::new(base_url.path(), HashMap::new()); - let file_metadata = storage.get_file_metadata("a.parquet").await; - assert_eq!(file_metadata.name, "a.parquet"); + let storage = Storage::new(base_url, HashMap::new()); + let file_info = storage.get_file_info("a.parquet").await; + assert_eq!(file_info.name, "a.parquet"); assert_eq!( - file_metadata.path, - ObjPath::from_url_path(base_url.join("a.parquet").unwrap().path()) - .unwrap() - .to_string() + file_info.uri, + storage.base_url.join("a.parquet").unwrap().to_string() ); - assert_eq!(file_metadata.size, 866); - assert_eq!(file_metadata.num_records, None); + assert_eq!(file_info.size, 866); + } + + #[tokio::test] + async fn storage_get_parquet_file_data() { + let base_url = + Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap(); + let storage = Storage::new(base_url, HashMap::new()); + let file_data = storage.get_parquet_file_data("a.parquet").await; + assert_eq!(file_data.len(), 1); + assert_eq!(file_data.first().unwrap().num_rows(), 5); } } diff --git a/crates/core/src/storage/file_metadata.rs b/crates/core/src/storage/utils.rs similarity index 74% rename from crates/core/src/storage/file_metadata.rs rename to crates/core/src/storage/utils.rs index a7b8f28..cf81dc0 100644 --- a/crates/core/src/storage/file_metadata.rs +++ b/crates/core/src/storage/utils.rs @@ -17,28 +17,9 @@ * under the License. */ -use anyhow::anyhow; -use anyhow::Result; +use anyhow::{anyhow, Result}; use std::path::Path; - -#[derive(Clone, Debug, Default, Eq, PartialEq)] -pub struct FileMetadata { - pub path: String, - pub name: String, - pub size: usize, - pub num_records: Option<usize>, -} - -impl FileMetadata { - pub fn new(path: String, name: String, size: usize) -> FileMetadata { - FileMetadata { - path, - name, - size, - num_records: None, - } - } -} +use url::{ParseError, Url}; pub fn split_filename(filename: &str) -> Result<(String, String)> { let path = Path::new(filename); @@ -57,3 +38,17 @@ pub fn split_filename(filename: &str) -> Result<(String, String)> { Ok((stem, extension)) } + +pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result<Url> { + let mut url = base_url.clone(); + + if url.path().ends_with('/') { + url.path_segments_mut().unwrap().pop(); + } + + url.path_segments_mut() + .map_err(|_| ParseError::RelativeUrlWithoutBase)? + .extend(segments); + + Ok(url) +} diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 2f3b981..5f9cf0f 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -18,30 +18,33 @@ */ use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use anyhow::{anyhow, Result}; +use arrow::record_batch::RecordBatch; +use url::Url; use crate::file_group::{BaseFile, FileGroup, FileSlice}; -use crate::storage::file_metadata::FileMetadata; +use crate::storage::file_info::FileInfo; +use crate::storage::file_stats::FileStats; use crate::storage::{get_leaf_dirs, Storage}; #[derive(Clone, Debug)] pub struct FileSystemView { - pub base_path: PathBuf, + pub base_url: Url, partition_to_file_groups: HashMap<String, Vec<FileGroup>>, } impl FileSystemView { - pub fn new(base_path: &Path) -> Self { + pub fn new(base_url: Url) -> Self { FileSystemView { - base_path: base_path.to_path_buf(), + base_url, partition_to_file_groups: HashMap::new(), } } async fn get_partition_paths(&self) -> Result<Vec<String>> { - let storage = Storage::new(self.base_path.to_str().unwrap(), HashMap::new()); + let storage = Storage::new(self.base_url.clone(), HashMap::new()); let top_level_dirs: Vec<String> = storage .list_dirs(None) .await @@ -56,16 +59,16 @@ impl FileSystemView { } async fn get_file_groups(&self, partition_path: &str) -> Result<Vec<FileGroup>> { - let storage = Storage::new(self.base_path.to_str().unwrap(), HashMap::new()); - let file_metadata: Vec<FileMetadata> = storage + let storage = Storage::new(self.base_url.clone(), HashMap::new()); + let file_info: Vec<FileInfo> = storage .list_files(Some(partition_path)) .await .into_iter() .filter(|f| f.name.ends_with(".parquet")) .collect(); let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = HashMap::new(); - for f in file_metadata { - let base_file = BaseFile::from_file_metadata(f); + for f in file_info { + let base_file = BaseFile::from_file_info(f)?; let fg_id = &base_file.file_group_id; fg_id_to_base_files .entry(fg_id.to_owned()) @@ -84,8 +87,7 @@ impl FileSystemView { Ok(file_groups) } - pub fn get_latest_file_slices(&mut self) -> Vec<&FileSlice> { - let mut file_slices = Vec::new(); + pub fn load_file_groups(&mut self) { let fs_view = self.clone(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -96,6 +98,10 @@ impl FileSystemView { for (k, v) in result { self.partition_to_file_groups.insert(k, v); } + } + + pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> { + let mut file_slices = Vec::new(); for fgs in self.partition_to_file_groups.values() { for fg in fgs { if let Some(file_slice) = fg.get_latest_file_slice() { @@ -105,6 +111,52 @@ impl FileSystemView { } file_slices } + + pub fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut FileSlice> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let mut file_slices = Vec::new(); + let file_groups = &mut self.partition_to_file_groups.values_mut(); + for fgs in file_groups { + for fg in fgs { + if let Some(file_slice) = fg.get_latest_file_slice_mut() { + let wrapper = async { load_file_slice_stats(&self.base_url, file_slice).await }; + let _ = rt.block_on(wrapper); + file_slices.push(file_slice) + } + } + } + file_slices + } + + pub fn read_file_slice(&self, relative_path: &str) -> Vec<RecordBatch> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let storage = Storage::new(self.base_url.clone(), HashMap::new()); + let wrapper = async { storage.get_parquet_file_data(relative_path).await }; + rt.block_on(wrapper) + } +} + +async fn load_file_slice_stats(base_url: &Url, file_slice: &mut FileSlice) -> Result<()> { + let base_file = &mut file_slice.base_file; + if base_file.stats.is_none() { + let storage = Storage::new(base_url.clone(), HashMap::new()); + let ptn = file_slice.partition_path.clone(); + let mut relative_path = PathBuf::from(ptn.unwrap_or("".to_string())); + let base_file_name = &base_file.info.name; + relative_path.push(base_file_name); + let parquet_meta = storage + .get_parquet_file_metadata(relative_path.to_str().unwrap()) + .await; + let num_records = parquet_meta.file_metadata().num_rows(); + base_file.populate_stats(FileStats { num_records }); + } + Ok(()) } async fn get_partitions_and_file_groups( @@ -133,17 +185,20 @@ async fn get_partitions_and_file_groups( #[cfg(test)] mod tests { use std::collections::HashSet; + use std::fs::canonicalize; use std::path::Path; - use crate::test_utils::extract_test_table; + use url::Url; use crate::table::fs_view::FileSystemView; + use crate::test_utils::extract_test_table; #[tokio::test] async fn get_partition_paths() { - let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip"); - let target_table_path = extract_test_table(fixture_path); - let fs_view = FileSystemView::new(&target_table_path); + let fixture_path = + canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap(); + let base_url = Url::from_file_path(extract_test_table(&fixture_path)).unwrap(); + let fs_view = FileSystemView::new(base_url); let partition_paths = fs_view.get_partition_paths().await.unwrap(); let partition_path_set: HashSet<&str> = HashSet::from_iter(partition_paths.iter().map(|p| p.as_str())); @@ -155,9 +210,11 @@ mod tests { #[test] fn get_latest_file_slices() { - let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip"); - let target_table_path = extract_test_table(fixture_path); - let mut fs_view = FileSystemView::new(&target_table_path); + let fixture_path = + canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap(); + let base_url = Url::from_file_path(extract_test_table(&fixture_path)).unwrap(); + let mut fs_view = FileSystemView::new(base_url); + fs_view.load_file_groups(); let file_slices = fs_view.get_latest_file_slices(); assert_eq!(file_slices.len(), 5); let mut fg_ids = Vec::new(); diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 9191aa4..681ef05 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -17,16 +17,18 @@ * under the License. */ -use anyhow::Result; use std::collections::HashMap; -use std::fs::File; use std::io::{BufRead, BufReader}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::str::FromStr; +use anyhow::Result; +use arrow::record_batch::RecordBatch; use arrow_schema::SchemaRef; +use url::Url; use crate::file_group::FileSlice; +use crate::storage::Storage; use crate::table::config::BaseFileFormat; use crate::table::config::{ConfigKey, TableType}; use crate::table::fs_view::FileSystemView; @@ -38,26 +40,44 @@ mod fs_view; mod metadata; #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct Table { - pub base_path: PathBuf, + pub base_url: Url, pub props: HashMap<String, String>, + pub file_system_view: Option<FileSystemView>, + pub storage_options: HashMap<String, String>, } impl Table { - pub fn new(table_base_path: &str) -> Self { - let base_path = PathBuf::from(table_base_path); - let props_path = base_path.join(".hoodie").join("hoodie.properties"); - match Self::load_properties(props_path.as_path()) { - Ok(props) => Self { base_path, props }, + pub fn new(base_uri: &str, storage_options: HashMap<String, String>) -> Self { + let base_url = Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap(); + match Self::load_properties(&base_url, ".hoodie/hoodie.properties", &storage_options) { + Ok(props) => Self { + base_url, + props, + file_system_view: None, + storage_options, + }, Err(e) => { panic!("Failed to load table properties: {}", e) } } } - fn load_properties(path: &Path) -> Result<HashMap<String, String>> { - let file = File::open(path)?; - let reader = BufReader::new(file); + fn load_properties( + base_url: &Url, + props_path: &str, + storage_options: &HashMap<String, String>, + ) -> Result<HashMap<String, String>> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let storage = Storage::new(base_url.clone(), storage_options.clone()); + let get_data = async { storage.get_file_data(props_path).await }; + let data = rt.block_on(get_data); + let cursor = std::io::Cursor::new(data); + let reader = BufReader::new(cursor); let lines = reader.lines(); let mut properties: HashMap<String, String> = HashMap::new(); for line in lines { @@ -81,58 +101,70 @@ impl Table { } } - pub fn get_timeline(&self) -> Result<Timeline> { + #[cfg(test)] + fn get_timeline(&self) -> Result<Timeline> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - let f = async { Timeline::new(self.base_path.to_str().unwrap()).await }; - rt.block_on(f) + let init_timeline = async { Timeline::new(self.base_url.clone()).await }; + rt.block_on(init_timeline) } - pub fn schema(&self) -> SchemaRef { + pub fn get_latest_schema(&self) -> SchemaRef { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - let f = async { Timeline::new(self.base_path.to_str().unwrap()).await }; - let timeline = rt.block_on(f); + let init_timeline = async { Timeline::new(self.base_url.clone()).await }; + let timeline = rt.block_on(init_timeline); match timeline { Ok(timeline) => { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); - let wrapper = async { timeline.get_latest_schema().await }; - let result = rt.block_on(wrapper); - match result { + let get_schema = async { timeline.get_latest_schema().await }; + match rt.block_on(get_schema) { Ok(schema) => SchemaRef::from(schema), - Err(e) => { - panic!("Failed to resolve table schema: {}", e) - } + Err(e) => panic!("Failed to resolve table schema: {}", e), } } - Err(e) => { - panic!("Failed to resolve table schema: {}", e) - } + Err(e) => panic!("Failed to resolve table schema: {}", e), } } - pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> { + pub fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> { + if self.file_system_view.is_none() { + let mut new_fs_view = FileSystemView::new(self.base_url.clone()); + new_fs_view.load_file_groups(); + self.file_system_view = Some(new_fs_view); + } + + let fs_view = self.file_system_view.as_mut().unwrap(); + let mut file_slices = Vec::new(); - let mut fs_view = FileSystemView::new(self.base_path.as_path()); - for f in fs_view.get_latest_file_slices() { + for f in fs_view.get_latest_file_slices_with_stats() { file_slices.push(f.clone()); } Ok(file_slices) } - pub fn get_latest_file_paths(&self) -> Result<Vec<String>> { + pub fn read_file_slice(&mut self, relative_path: &str) -> Vec<RecordBatch> { + if self.file_system_view.is_none() { + let mut new_fs_view = FileSystemView::new(self.base_url.clone()); + new_fs_view.load_file_groups(); + self.file_system_view = Some(new_fs_view); + } + + let fs_view = self.file_system_view.as_ref().unwrap(); + fs_view.read_file_slice(relative_path) + } + + pub fn get_latest_file_paths(&mut self) -> Result<Vec<String>> { let mut file_paths = Vec::new(); for f in self.get_latest_file_slices()? { - if let Some(f) = f.base_file_path() { - file_paths.push(f.to_string()); - } + file_paths.push(f.base_file_path().to_string()); } Ok(file_paths) } @@ -178,7 +210,7 @@ impl ProvidesTableMetadata for Table { } fn location(&self) -> String { - self.base_path.to_str().unwrap().to_string() + self.base_url.path().to_string() } fn partition_fields(&self) -> Vec<String> { @@ -223,7 +255,10 @@ impl ProvidesTableMetadata for Table { #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::fs::canonicalize; use std::path::Path; + use url::Url; use crate::table::config::BaseFileFormat::Parquet; use crate::table::config::TableType::CopyOnWrite; @@ -231,20 +266,62 @@ mod tests { use crate::table::Table; use crate::test_utils::extract_test_table; + #[test] + fn hudi_table_get_latest_schema() { + let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip"); + let base_url = Url::from_file_path(extract_test_table(fixture_path)).unwrap(); + let hudi_table = Table::new(base_url.path(), HashMap::new()); + let fields: Vec<String> = hudi_table + .get_latest_schema() + .all_fields() + .into_iter() + .map(|f| f.name().to_string()) + .collect(); + assert_eq!( + fields, + Vec::from([ + "_hoodie_commit_time", + "_hoodie_commit_seqno", + "_hoodie_record_key", + "_hoodie_partition_path", + "_hoodie_file_name", + "ts", + "uuid", + "rider", + "driver", + "fare", + "city" + ]) + ); + } + + #[test] + fn hudi_table_read_file_slice() { + let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip"); + let base_url = Url::from_file_path(extract_test_table(fixture_path)).unwrap(); + let mut hudi_table = Table::new(base_url.path(), HashMap::new()); + let batches = hudi_table.read_file_slice( + "san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet", + ); + assert_eq!(batches.len(), 1); + assert_eq!(batches.first().unwrap().num_rows(), 1); + assert_eq!(batches.first().unwrap().num_columns(), 11); + } + #[test] fn hudi_table_get_latest_file_paths() { let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip"); - let target_table_path = extract_test_table(fixture_path); - let hudi_table = Table::new(target_table_path.to_str().unwrap()); + let base_url = Url::from_file_path(extract_test_table(fixture_path)).unwrap(); + let mut hudi_table = Table::new(base_url.path(), HashMap::new()); assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2); assert_eq!(hudi_table.get_latest_file_paths().unwrap().len(), 5); - println!("{}", hudi_table.schema()); } #[test] fn hudi_table_get_table_metadata() { - let fixture_path = Path::new("fixtures/table_metadata/sample_table_properties"); - let table = Table::new(fixture_path.to_str().unwrap()); + let base_path = + canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap(); + let table = Table::new(base_path.to_str().unwrap(), HashMap::new()); assert_eq!(table.base_file_format(), Parquet); assert_eq!(table.checksum(), 3761586722); assert_eq!(table.database_name(), "default"); @@ -256,10 +333,7 @@ mod tests { table.key_generator_class(), "org.apache.hudi.keygen.SimpleKeyGenerator" ); - assert_eq!( - table.location(), - "fixtures/table_metadata/sample_table_properties" - ); + assert_eq!(table.location(), base_path.to_str().unwrap()); assert_eq!(table.partition_fields(), vec!["city"]); assert_eq!(table.precombine_field(), "ts"); assert!(table.populates_meta_fields()); diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs index 3ac0a70..e7f8010 100644 --- a/crates/core/src/timeline/mod.rs +++ b/crates/core/src/timeline/mod.rs @@ -25,8 +25,9 @@ use anyhow::{anyhow, Result}; use arrow_schema::SchemaRef; use parquet::arrow::parquet_to_arrow_schema; use serde_json::{Map, Value}; +use url::Url; -use crate::storage::file_metadata::split_filename; +use crate::storage::utils::split_filename; use crate::storage::Storage; #[allow(dead_code)] @@ -60,24 +61,21 @@ impl Instant { #[derive(Debug, Clone)] pub struct Timeline { - pub base_path: String, + pub base_url: Url, pub instants: Vec<Instant>, } impl Timeline { - pub async fn new(base_path: &str) -> Result<Self> { - let instants = Self::load_completed_commit_instants(base_path).await?; - Ok(Self { - base_path: base_path.to_string(), - instants, - }) + pub async fn new(base_url: Url) -> Result<Self> { + let instants = Self::load_completed_commit_instants(&base_url).await?; + Ok(Self { base_url, instants }) } - async fn load_completed_commit_instants(base_path: &str) -> Result<Vec<Instant>> { - let storage = Storage::new(base_path, HashMap::new()); + async fn load_completed_commit_instants(base_url: &Url) -> Result<Vec<Instant>> { + let storage = Storage::new(base_url.clone(), HashMap::new()); let mut completed_commits = Vec::new(); - for file_metadata in storage.list_files(Some(".hoodie")).await { - let (file_stem, file_ext) = split_filename(file_metadata.name.as_str())?; + for file_info in storage.list_files(Some(".hoodie")).await { + let (file_stem, file_ext) = split_filename(file_info.name.as_str())?; if file_ext == "commit" { completed_commits.push(Instant { state: State::Completed, @@ -96,7 +94,7 @@ impl Timeline { Some(instant) => { let mut commit_file_path = PathBuf::from(".hoodie"); commit_file_path.push(instant.file_name()); - let storage = Storage::new(&self.base_path, HashMap::new()); + let storage = Storage::new(self.base_url.clone(), HashMap::new()); let bytes = storage .get_file_data(commit_file_path.to_str().unwrap()) .await; @@ -118,7 +116,7 @@ impl Timeline { if let Some((_, value)) = partition_to_write_stats.iter().next() { if let Some(first_value) = value.as_array().and_then(|arr| arr.first()) { if let Some(path) = first_value["path"].as_str() { - let storage = Storage::new(&self.base_path, HashMap::new()); + let storage = Storage::new(self.base_url.clone(), HashMap::new()); let parquet_meta = storage.get_parquet_file_metadata(path).await; let arrow_schema = parquet_to_arrow_schema( parquet_meta.file_metadata().schema_descr(), @@ -138,6 +136,8 @@ mod tests { use std::fs::canonicalize; use std::path::Path; + use url::Url; + use crate::test_utils::extract_test_table; use crate::timeline::{Instant, State, Timeline}; @@ -145,16 +145,18 @@ mod tests { async fn read_latest_schema() { let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip"); let target_table_path = extract_test_table(fixture_path); - let base_path = canonicalize(target_table_path).unwrap(); - let timeline = Timeline::new(base_path.to_str().unwrap()).await.unwrap(); + let base_url = Url::from_file_path(canonicalize(target_table_path).unwrap()).unwrap(); + let timeline = Timeline::new(base_url).await.unwrap(); let table_schema = timeline.get_latest_schema().await.unwrap(); assert_eq!(table_schema.fields.len(), 11) } #[tokio::test] async fn init_commits_timeline() { - let base_path = canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(); - let timeline = Timeline::new(base_path.to_str().unwrap()).await.unwrap(); + let base_url = + Url::from_file_path(canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap()) + .unwrap(); + let timeline = Timeline::new(base_url).await.unwrap(); assert_eq!( timeline.instants, vec![ diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 56aae9d..7025a8c 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -19,6 +19,7 @@ use arrow_array::RecordBatch; use std::any::Any; +use std::collections::HashMap; use std::fmt::Debug; use std::fs::File; use std::sync::Arc; @@ -45,7 +46,7 @@ pub struct HudiDataSource { impl HudiDataSource { pub fn new(base_path: &str) -> Self { Self { - table: HudiTable::new(base_path), + table: HudiTable::new(base_path, HashMap::new()), } } pub(crate) async fn create_physical_plan( @@ -56,7 +57,7 @@ impl HudiDataSource { Ok(Arc::new(HudiExec::new(projections, schema, self.clone()))) } - fn get_record_batches(&self) -> datafusion_common::Result<Vec<RecordBatch>> { + fn get_record_batches(&mut self) -> datafusion_common::Result<Vec<RecordBatch>> { match self.table.get_latest_file_paths() { Ok(file_paths) => { let mut record_batches = Vec::new(); @@ -84,7 +85,7 @@ impl TableProvider for HudiDataSource { } fn schema(&self) -> SchemaRef { - self.table.schema() + self.table.get_latest_schema() } fn table_type(&self) -> TableType { @@ -161,7 +162,8 @@ impl ExecutionPlan for HudiExec { _partition: usize, _context: Arc<TaskContext>, ) -> datafusion_common::Result<SendableRecordBatchStream> { - let data = self.data_source.get_record_batches()?; + let mut data_source = self.data_source.clone(); + let data = data_source.get_record_batches()?; Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?)) } } diff --git a/python/Cargo.toml b/python/Cargo.toml index 6c5702b..613b010 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -27,10 +27,22 @@ doc = false [dependencies] object_store = { workspace = true } +# arrow +arrow = { workspace = true } +arrow-arith = { workspace = true } +arrow-array = { workspace = true } +arrow-buffer = { workspace = true } +arrow-cast = { workspace = true } +arrow-ipc = { workspace = true } +arrow-json = { workspace = true } +arrow-ord = { workspace = true } +arrow-row = { workspace = true } +arrow-schema = { workspace = true } +arrow-select = { workspace = true } [dependencies.pyo3] -version = "0.21.2" -features = ["extension-module", "abi3", "abi3-py38", "gil-refs"] +version = "0.20.3" +features = ["extension-module", "abi3", "abi3-py38"] [dependencies.hudi] path = "../crates/hudi" diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py index fa70485..8054368 100644 --- a/python/hudi/__init__.py +++ b/python/hudi/__init__.py @@ -17,4 +17,5 @@ from ._internal import __version__ as __version__ from ._internal import rust_core_version as rust_core_version +from ._internal import HudiFileSlice as HudiFileSlice from .table import HudiTable as HudiTable diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 8759c2e..b34c25a 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -15,7 +15,9 @@ # specific language governing permissions and limitations # under the License. -from typing import List +from typing import List, Dict, Optional + +import pyarrow __version__: str @@ -23,8 +25,26 @@ __version__: str def rust_core_version() -> str: ... +class HudiFileSlice: + file_group_id: str + partition_path: str + commit_time: str + base_file_name: str + base_file_path: str + base_file_size: int + num_records: int + + class BindingHudiTable: - def __init__(self, table_uri: str): ... + def __init__( + self, + table_uri: str, + storage_options: Optional[Dict[str, str]] = None, + ): ... + + def schema(self) -> "pyarrow.Schema": ... + + def get_latest_file_slices(self) -> List[HudiFileSlice]: ... - def get_latest_file_paths(self) -> List[str]: ... + def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]: ... diff --git a/python/hudi/_internal.pyi b/python/hudi/_utils.py similarity index 78% copy from python/hudi/_internal.pyi copy to python/hudi/_utils.py index 8759c2e..779a14b 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_utils.py @@ -14,17 +14,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import List, Any, Iterator -from typing import List -__version__: str - - -def rust_core_version() -> str: ... - - -class BindingHudiTable: - - def __init__(self, table_uri: str): ... - - def get_latest_file_paths(self) -> List[str]: ... +def split_list(lst: List[Any], n: int) -> Iterator[List[Any]]: + split_size = (len(lst) + n - 1) // n + for i in range(0, len(lst), split_size): + yield lst[i: i + split_size] diff --git a/python/hudi/table.py b/python/hudi/table.py index a8cfbcb..c9cab1b 100644 --- a/python/hudi/table.py +++ b/python/hudi/table.py @@ -15,18 +15,37 @@ # specific language governing permissions and limitations # under the License. +import os from dataclasses import dataclass from pathlib import Path -from typing import Union, List +from typing import Union, List, Iterator, Optional, Dict -from python.hudi._internal import BindingHudiTable +import pyarrow + +from hudi._internal import BindingHudiTable, HudiFileSlice +from hudi._utils import split_list @dataclass(init=False) class HudiTable: - def __init__(self, table_uri: Union[str, Path, ""]): - self._table = BindingHudiTable(str(table_uri)) + def __init__( + self, + table_uri: Union[str, Path, "os.PathLike[str]"], + storage_options: Optional[Dict[str, str]] = None, + ): + self._table = BindingHudiTable(str(table_uri), storage_options) + + def schema(self) -> "pyarrow.Schema": + return self._table.schema() + + def split_latest_file_slices(self, n) -> Iterator[List[HudiFileSlice]]: + file_slices = self.get_latest_file_slices() + for split in split_list(file_slices, n): + yield split + + def get_latest_file_slices(self) -> List[HudiFileSlice]: + return self._table.get_latest_file_slices() - def get_latest_file_paths(self) -> List[str]: - return self._table.get_latest_file_paths() + def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]: + return self._table.read_file_slice(relative_path) diff --git a/python/pyproject.toml b/python/pyproject.toml index 1abc6c1..175773c 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -22,6 +22,7 @@ build-backend = "maturin" [project] name = "hudi" description = "Native Hudi Python binding based on hudi-rs" +urls = { repository = "https://github.com/apache/hudi-rs/tree/main/python/" } readme = "README.md" requires-python = ">=3.8" license = "Apache License 2.0" @@ -37,8 +38,22 @@ dependencies = [ "pyarrow>=8" ] +optional-dependencies = { devel = [ + "pytest" +] } + dynamic = ["version"] [tool.maturin] module-name = "hudi._internal" features = ["pyo3/extension-module"] + +[tool.mypy] +files = "hudi/*.py" +exclude = "^tests" + +[tool.pytest.ini_options] +testpaths = [ + "tests", + "hudi", +] diff --git a/python/src/lib.rs b/python/src/lib.rs index 8ac33f4..07c81b0 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -17,35 +17,91 @@ * under the License. */ +use std::collections::HashMap; +use std::path::PathBuf; + +use arrow::pyarrow::ToPyArrow; use pyo3::prelude::*; -use hudi::table::Table; +use hudi::file_group::FileSlice; +use hudi::HudiTable; + +#[pyclass] +struct HudiFileSlice { + #[pyo3(get)] + file_group_id: String, + #[pyo3(get)] + partition_path: String, + #[pyo3(get)] + commit_time: String, + #[pyo3(get)] + base_file_name: String, + #[pyo3(get)] + base_file_path: String, + #[pyo3(get)] + base_file_size: usize, + #[pyo3(get)] + num_records: i64, +} + +impl HudiFileSlice { + pub fn from_file_slice(f: FileSlice) -> Self { + let partition_path = f.partition_path.clone().unwrap_or("".to_string()); + let mut p = PathBuf::from(&partition_path); + p.push(f.base_file.info.name.clone()); + let base_file_path = p.to_str().unwrap().to_string(); + Self { + file_group_id: f.file_group_id().to_string(), + partition_path, + commit_time: f.base_file.commit_time, + base_file_name: f.base_file.info.name, + base_file_path, + base_file_size: f.base_file.info.size, + num_records: f.base_file.stats.unwrap().num_records, + } + } +} #[pyclass] struct BindingHudiTable { - _table: hudi::HudiTable, + _table: HudiTable, } #[pymethods] impl BindingHudiTable { #[new] - #[pyo3(signature = (table_uri))] - fn new(py: Python, table_uri: &str) -> PyResult<Self> { + #[pyo3(signature = (table_uri, storage_options = None))] + fn new( + py: Python, + table_uri: &str, + storage_options: Option<HashMap<String, String>>, + ) -> PyResult<Self> { py.allow_threads(|| { Ok(BindingHudiTable { - _table: Table::new(table_uri), + _table: HudiTable::new(table_uri, storage_options.unwrap_or_default()), }) }) } - pub fn get_latest_file_paths(&self) -> PyResult<Vec<String>> { - match self._table.get_latest_file_paths() { - Ok(paths) => Ok(paths), + pub fn schema(&self, py: Python) -> PyResult<PyObject> { + self._table.get_latest_schema().to_pyarrow(py) + } + + pub fn get_latest_file_slices(&mut self) -> PyResult<Vec<HudiFileSlice>> { + match self._table.get_latest_file_slices() { + Ok(file_slices) => Ok(file_slices + .into_iter() + .map(HudiFileSlice::from_file_slice) + .collect()), Err(_e) => { - panic!("Failed to retrieve the latest file paths.") + panic!("Failed to retrieve the latest file slices.") } } } + + pub fn read_file_slice(&mut self, relative_path: &str, py: Python) -> PyResult<PyObject> { + self._table.read_file_slice(relative_path).to_pyarrow(py) + } } #[pyfunction] @@ -58,6 +114,7 @@ fn _internal(_py: Python, m: &PyModule) -> PyResult<()> { m.add("__version__", env!("CARGO_PKG_VERSION"))?; m.add_function(wrap_pyfunction!(rust_core_version, m)?)?; + m.add_class::<HudiFileSlice>()?; m.add_class::<BindingHudiTable>()?; Ok(()) } diff --git a/python/hudi/__init__.py b/python/tests/__init__.py similarity index 83% copy from python/hudi/__init__.py copy to python/tests/__init__.py index fa70485..a67d5ea 100644 --- a/python/hudi/__init__.py +++ b/python/tests/__init__.py @@ -14,7 +14,3 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - -from ._internal import __version__ as __version__ -from ._internal import rust_core_version as rust_core_version -from .table import HudiTable as HudiTable diff --git a/python/hudi/table.py b/python/tests/conftest.py similarity index 58% copy from python/hudi/table.py copy to python/tests/conftest.py index a8cfbcb..2dcfdeb 100644 --- a/python/hudi/table.py +++ b/python/tests/conftest.py @@ -15,18 +15,26 @@ # specific language governing permissions and limitations # under the License. -from dataclasses import dataclass +import os +import zipfile from pathlib import Path -from typing import Union, List -from python.hudi._internal import BindingHudiTable +import pytest -@dataclass(init=False) -class HudiTable: +def _extract_testing_table(zip_file_path, target_path) -> str: + with zipfile.ZipFile(zip_file_path, "r") as zip_ref: + zip_ref.extractall(target_path) + return os.path.join(target_path, "trips_table") - def __init__(self, table_uri: Union[str, Path, ""]): - self._table = BindingHudiTable(str(table_uri)) - def get_latest_file_paths(self) -> List[str]: - return self._table.get_latest_file_paths() +@pytest.fixture( + params=[ + "0.x_cow_partitioned", + ] +) +def get_sample_table(request, tmp_path) -> str: + fixture_path = "../crates/core/fixtures/table" + table_name = request.param + zip_file_path = Path(fixture_path).joinpath(f"{table_name}.zip") + return _extract_testing_table(zip_file_path, tmp_path) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py new file mode 100644 index 0000000..20ce42c --- /dev/null +++ b/python/tests/test_table_read.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyarrow as pa +import pytest + +from hudi import HudiTable + +PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) < (8, 0, 0) +pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="hudi only supported if pyarrow >= 8.0.0") + + +def test_sample_table(get_sample_table): + table_path = get_sample_table + table = HudiTable(table_path, {}) + + assert table.schema().names == ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', + '_hoodie_partition_path', '_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver', + 'fare', 'city'] + + file_slices = table.get_latest_file_slices() + assert len(file_slices) == 5 + assert set(f.commit_time for f in file_slices) == {'20240402123035233', '20240402144910683'} + assert all(f.num_records == 1 for f in file_slices) + file_slice_paths = [f.base_file_path for f in file_slices] + assert set(file_slice_paths) == {'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet', + 'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet', + 'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet', + 'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet', + 'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'} + + batches = table.read_file_slice(file_slice_paths[0]) + t = pa.Table.from_batches(batches) + assert t.num_rows == 1 + assert t.num_columns == 11 + + file_slices_gen = table.split_latest_file_slices(2) + assert len(next(file_slices_gen)) == 3 + assert len(next(file_slices_gen)) == 2