This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push: new 199a25d feat: add APIs for time-travel read (#33) 199a25d is described below commit 199a25d82ba09c0bedeb12430bf22299603209b2 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Tue Jul 2 15:48:05 2024 -0500 feat: add APIs for time-travel read (#33) --- crates/core/src/file_group/mod.rs | 27 ++++++++--- crates/core/src/storage/mod.rs | 12 ++--- crates/core/src/table/fs_view.rs | 38 +++++++++------ crates/core/src/table/mod.rs | 98 +++++++++++++++++++++++++++++++++++---- crates/core/src/table/timeline.rs | 70 ++++++++++++++++++++-------- crates/datafusion/src/lib.rs | 14 ++---- crates/tests/src/lib.rs | 7 +++ python/hudi/_internal.pyi | 12 +++-- python/hudi/table.py | 22 +++++---- python/src/lib.rs | 31 ++++++++----- python/tests/test_table_read.py | 44 ++++++++++++++---- 11 files changed, 279 insertions(+), 96 deletions(-) diff --git a/crates/core/src/file_group/mod.rs b/crates/core/src/file_group/mod.rs index 6b9b22c..c0af0b3 100644 --- a/crates/core/src/file_group/mod.rs +++ b/crates/core/src/file_group/mod.rs @@ -100,7 +100,7 @@ impl FileSlice { if self.base_file.stats.is_none() { let parquet_meta = storage .get_parquet_file_metadata(&self.base_file_relative_path()) - .await; + .await?; let num_records = parquet_meta.file_metadata().num_rows(); let stats = FileStats { num_records }; self.base_file.stats = Some(stats); @@ -163,12 +163,22 @@ impl FileGroup { } } - pub fn get_latest_file_slice(&self) -> Option<&FileSlice> { - return self.file_slices.values().next_back(); + pub fn get_file_slice_as_of(&self, timestamp: &str) -> Option<&FileSlice> { + let as_of = timestamp.to_string(); + return if let Some((_, file_slice)) = self.file_slices.range(..=as_of).next_back() { + Some(file_slice) + } else { + None + }; } - pub fn get_latest_file_slice_mut(&mut self) -> Option<&mut FileSlice> { - return self.file_slices.values_mut().next_back(); + pub fn get_file_slice_mut_as_of(&mut self, timestamp: &str) -> Option<&mut FileSlice> { + let as_of = timestamp.to_string(); + return if let Some((_, file_slice)) = self.file_slices.range_mut(..=as_of).next_back() { + Some(file_slice) + } else { + None + }; } } @@ -203,8 +213,11 @@ mod tests { let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| k.as_str()).collect(); assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]); assert_eq!( - fg.get_latest_file_slice().unwrap().base_file.commit_time, - "20240402144910683" + fg.get_file_slice_as_of("20240402123035233") + .unwrap() + .base_file + .commit_time, + "20240402123035233" ) } diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 76e085f..0f09c05 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -72,14 +72,14 @@ impl Storage { } } - pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> ParquetMetaData { - let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap(); - let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap(); + pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result<ParquetMetaData> { + let obj_url = join_url_segments(&self.base_url, &[relative_path])?; + let obj_path = ObjPath::from_url_path(obj_url.path())?; let obj_store = self.object_store.clone(); - let meta = obj_store.head(&obj_path).await.unwrap(); + let meta = obj_store.head(&obj_path).await?; let reader = ParquetObjectReader::new(obj_store, meta); - let builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap(); - builder.metadata().as_ref().to_owned() + let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; + Ok(builder.metadata().as_ref().to_owned()) } pub async fn get_file_data(&self, relative_path: &str) -> Bytes { diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 8f278dd..b7cd77c 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -44,7 +44,7 @@ impl FileSystemView { props: Arc<HashMap<String, String>>, ) -> Result<Self> { let storage = Storage::new(base_url, storage_options)?; - let partition_paths = Self::get_partition_paths(&storage).await?; + let partition_paths = Self::load_partition_paths(&storage).await?; let partition_to_file_groups = Self::load_file_groups_for_partitions(&storage, partition_paths).await?; let partition_to_file_groups = Arc::new(DashMap::from_iter(partition_to_file_groups)); @@ -55,7 +55,7 @@ impl FileSystemView { }) } - async fn get_partition_paths(storage: &Storage) -> Result<Vec<String>> { + async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> { let top_level_dirs: Vec<String> = storage .list_dirs(None) .await @@ -120,24 +120,25 @@ impl FileSystemView { Ok(file_groups) } - pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> { + pub fn get_file_slices_as_of(&self, timestamp: &str) -> Result<Vec<FileSlice>> { let mut file_slices = Vec::new(); for fgs in self.partition_to_file_groups.iter() { let fgs_ref = fgs.value(); for fg in fgs_ref { - if let Some(fsl) = fg.get_latest_file_slice() { - file_slices.push(fsl.clone()) + if let Some(fsl) = fg.get_file_slice_as_of(timestamp) { + // TODO: pass ref instead of copying + file_slices.push(fsl.clone()); } } } Ok(file_slices) } - pub async fn load_latest_file_slices_stats(&self) -> Result<()> { + pub async fn load_file_slices_stats_as_of(&self, timestamp: &str) -> Result<()> { for mut fgs in self.partition_to_file_groups.iter_mut() { let fgs_ref = fgs.value_mut(); for fg in fgs_ref { - if let Some(file_slice) = fg.get_latest_file_slice_mut() { + if let Some(file_slice) = fg.get_file_slice_mut_as_of(timestamp) { file_slice .load_stats(&self.storage) .await @@ -148,11 +149,17 @@ impl FileSystemView { Ok(()) } - pub async fn read_file_slice_by_path(&self, relative_path: &str) -> Result<Vec<RecordBatch>> { + pub async fn read_file_slice_by_path_unchecked( + &self, + relative_path: &str, + ) -> Result<Vec<RecordBatch>> { Ok(self.storage.get_parquet_file_data(relative_path).await) } - pub async fn read_file_slice(&self, file_slice: &FileSlice) -> Result<Vec<RecordBatch>> { - self.read_file_slice_by_path(&file_slice.base_file_relative_path()) + pub async fn read_file_slice_unchecked( + &self, + file_slice: &FileSlice, + ) -> Result<Vec<RecordBatch>> { + self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path()) .await } } @@ -171,7 +178,9 @@ mod tests { async fn get_partition_paths_for_nonpartitioned_table() { let base_url = TestTable::V6Nonpartitioned.url(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let partition_paths = FileSystemView::get_partition_paths(&storage).await.unwrap(); + let partition_paths = FileSystemView::load_partition_paths(&storage) + .await + .unwrap(); let partition_path_set: HashSet<&str> = HashSet::from_iter(partition_paths.iter().map(|p| p.as_str())); assert_eq!(partition_path_set, HashSet::from([""])) @@ -181,7 +190,9 @@ mod tests { async fn get_partition_paths_for_complexkeygen_table() { let base_url = TestTable::V6ComplexkeygenHivestyle.url(); let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); - let partition_paths = FileSystemView::get_partition_paths(&storage).await.unwrap(); + let partition_paths = FileSystemView::load_partition_paths(&storage) + .await + .unwrap(); let partition_path_set: HashSet<&str> = HashSet::from_iter(partition_paths.iter().map(|p| p.as_str())); assert_eq!( @@ -204,7 +215,8 @@ mod tests { ) .await .unwrap(); - let file_slices = fs_view.get_latest_file_slices().unwrap(); + + let file_slices = fs_view.get_file_slices_as_of("20240418173551906").unwrap(); assert_eq!(file_slices.len(), 1); let fg_ids = file_slices .iter() diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 825b2d4..52daf12 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -115,11 +115,42 @@ impl Table { } pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> { + if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { + self.get_file_slices_as_of(timestamp).await + } else { + Ok(Vec::new()) + } + } + + pub async fn get_file_slices_as_of(&self, timestamp: &str) -> Result<Vec<FileSlice>> { self.file_system_view - .load_latest_file_slices_stats() + .load_file_slices_stats_as_of(timestamp) .await - .expect("Successful loading of file slice stats."); - self.file_system_view.get_latest_file_slices() + .context("Fail to load file slice stats.")?; + self.file_system_view.get_file_slices_as_of(timestamp) + } + + pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> { + if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { + self.read_snapshot_as_of(timestamp).await + } else { + Ok(Vec::new()) + } + } + + pub async fn read_snapshot_as_of(&self, timestamp: &str) -> Result<Vec<RecordBatch>> { + let file_slices = self + .get_file_slices_as_of(timestamp) + .await + .context(format!("Failed to get file slices as of {}", timestamp))?; + let mut batches = Vec::new(); + for f in file_slices { + match self.file_system_view.read_file_slice_unchecked(&f).await { + Ok(batch) => batches.extend(batch), + Err(e) => return Err(anyhow!("Failed to read file slice {:?} - {}", f, e)), + } + } + Ok(batches) } #[cfg(test)] @@ -133,13 +164,9 @@ impl Table { pub async fn read_file_slice_by_path(&self, relative_path: &str) -> Result<Vec<RecordBatch>> { self.file_system_view - .read_file_slice_by_path(relative_path) + .read_file_slice_by_path_unchecked(relative_path) .await } - - pub async fn read_file_slice(&self, file_slice: &FileSlice) -> Result<Vec<RecordBatch>> { - self.file_system_view.read_file_slice(file_slice).await - } } impl ProvidesTableMetadata for Table { @@ -240,7 +267,7 @@ mod tests { use crate::table::Table; #[tokio::test] - async fn hudi_table_get_latest_schema() { + async fn hudi_table_get_schema() { let base_url = TestTable::V6Nonpartitioned.url(); let hudi_table = Table::new(base_url.path(), HashMap::new()).await.unwrap(); let fields: Vec<String> = hudi_table @@ -308,7 +335,7 @@ mod tests { } #[tokio::test] - async fn hudi_table_get_latest_file_paths() { + async fn hudi_table_get_file_paths() { let base_url = TestTable::V6ComplexkeygenHivestyle.url(); let hudi_table = Table::new(base_url.path(), HashMap::new()).await.unwrap(); assert_eq!(hudi_table.timeline.instants.len(), 2); @@ -324,6 +351,57 @@ mod tests { assert_eq!(actual, expected); } + #[tokio::test] + async fn hudi_table_get_file_slices_as_of_timestamps() { + let base_url = TestTable::V6Nonpartitioned.url(); + let hudi_table = Table::new(base_url.path(), HashMap::new()).await.unwrap(); + + let file_slices = hudi_table.get_file_slices().await.unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::<Vec<_>>(), + vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] + ); + + // as of the latest timestamp + let file_slices = hudi_table + .get_file_slices_as_of("20240418173551906") + .await + .unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::<Vec<_>>(), + vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",] + ); + + // as of just smaller than the latest timestamp + let file_slices = hudi_table + .get_file_slices_as_of("20240418173551905") + .await + .unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::<Vec<_>>(), + vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",] + ); + + // as of non-exist old timestamp + let file_slices = hudi_table.get_file_slices_as_of("0").await.unwrap(); + assert_eq!( + file_slices + .iter() + .map(|f| f.base_file_relative_path()) + .collect::<Vec<_>>(), + Vec::<String>::new() + ); + } + #[tokio::test] async fn hudi_table_get_table_metadata() { let base_path = diff --git a/crates/core/src/table/timeline.rs b/crates/core/src/table/timeline.rs index 9dcf6e2..70fc6ee 100644 --- a/crates/core/src/table/timeline.rs +++ b/crates/core/src/table/timeline.rs @@ -23,7 +23,7 @@ use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use arrow_schema::Schema; use parquet::arrow::parquet_to_arrow_schema; use serde_json::{Map, Value}; @@ -112,6 +112,13 @@ impl Timeline { Ok(completed_commits) } + pub fn get_latest_commit_timestamp(&self) -> Option<&str> { + self.instants + .iter() + .next_back() + .map(|instant| instant.timestamp.as_str()) + } + async fn get_latest_commit_metadata(&self) -> Result<Map<String, Value>> { match self.instants.iter().next_back() { Some(instant) => { @@ -120,8 +127,8 @@ impl Timeline { let relative_path = commit_file_path.to_str().ok_or(anyhow!( "Failed to get commit file path for instant: {:?}", instant - )); - let bytes = self.storage.get_file_data(relative_path?).await; + ))?; + let bytes = self.storage.get_file_data(relative_path).await; let json: Value = serde_json::from_slice(&bytes)?; let commit_metadata = json .as_object() @@ -135,22 +142,29 @@ impl Timeline { pub async fn get_latest_schema(&self) -> Result<Schema> { let commit_metadata = self.get_latest_commit_metadata().await?; - if let Some(partition_to_write_stats) = commit_metadata["partitionToWriteStats"].as_object() - { - if let Some((_, value)) = partition_to_write_stats.iter().next() { - if let Some(first_value) = value.as_array().and_then(|arr| arr.first()) { - if let Some(path) = first_value["path"].as_str() { - let parquet_meta = self.storage.get_parquet_file_metadata(path).await; - let arrow_schema = parquet_to_arrow_schema( - parquet_meta.file_metadata().schema_descr(), - None, - )?; - return Ok(arrow_schema); - } - } - } + + let parquet_path = commit_metadata + .get("partitionToWriteStats") + .and_then(|v| v.as_object()) + .and_then(|obj| obj.values().next()) + .and_then(|value| value.as_array()) + .and_then(|arr| arr.first()) + .and_then(|first_value| first_value["path"].as_str()); + + if let Some(path) = parquet_path { + let parquet_meta = self + .storage + .get_parquet_file_metadata(path) + .await + .context("Failed to get parquet file metadata")?; + + parquet_to_arrow_schema(parquet_meta.file_metadata().schema_descr(), None) + .context("Failed to resolve the latest schema") + } else { + Err(anyhow!( + "Failed to resolve the latest schema: no file path found" + )) } - Err(anyhow!("Failed to resolve schema.")) } } @@ -168,7 +182,7 @@ mod tests { use crate::table::timeline::{Instant, State, Timeline}; #[tokio::test] - async fn read_latest_schema() { + async fn timeline_read_latest_schema() { let base_url = TestTable::V6Nonpartitioned.url(); let timeline = Timeline::new( Arc::new(base_url), @@ -181,6 +195,24 @@ mod tests { assert_eq!(table_schema.fields.len(), 21) } + #[tokio::test] + async fn timeline_read_latest_schema_from_empty_table() { + let base_url = TestTable::V6Empty.url(); + let timeline = Timeline::new( + Arc::new(base_url), + Arc::new(HashMap::new()), + Arc::new(HashMap::new()), + ) + .await + .unwrap(); + let table_schema = timeline.get_latest_schema().await; + assert!(table_schema.is_err()); + assert_eq!( + table_schema.err().unwrap().to_string(), + "Failed to resolve the latest schema: no file path found" + ) + } + #[tokio::test] async fn init_commits_timeline() { let base_url = diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index f4a1bba..f677247 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -65,17 +65,11 @@ impl HudiDataSource { } async fn get_record_batches(&mut self) -> datafusion_common::Result<Vec<RecordBatch>> { - let file_slices = self.table.get_file_slices().await.map_err(|e| { - DataFusionError::Execution(format!("Failed to load file slices from table: {}", e)) - })?; - - let mut record_batches = Vec::new(); - for fsl in file_slices { - let batches = self.table.read_file_slice(&fsl).await.map_err(|e| { - DataFusionError::Execution(format!("Failed to read records from table: {}", e)) + let record_batches = + self.table.read_snapshot().await.map_err(|e| { + DataFusionError::Execution(format!("Failed to read snapshot: {}", e)) })?; - record_batches.extend(batches) - } + Ok(record_batches) } } diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs index e467818..f94d5fd 100644 --- a/crates/tests/src/lib.rs +++ b/crates/tests/src/lib.rs @@ -35,6 +35,7 @@ pub fn extract_test_table(zip_path: &Path) -> PathBuf { pub enum TestTable { V6ComplexkeygenHivestyle, + V6Empty, V6Nonpartitioned, } @@ -46,6 +47,7 @@ impl TestTable { Self::V6ComplexkeygenHivestyle => data_path .join("v6_complexkeygen_hivestyle.zip") .into_boxed_path(), + Self::V6Empty => data_path.join("v6_empty.zip").into_boxed_path(), Self::V6Nonpartitioned => data_path.join("v6_nonpartitioned.zip").into_boxed_path(), } } @@ -58,6 +60,11 @@ impl TestTable { .to_str() .unwrap() .to_string(), + Self::V6Empty => extract_test_table(&zip_path) + .join("v6_empty") + .to_str() + .unwrap() + .to_string(), Self::V6Nonpartitioned => extract_test_table(&zip_path) .join("v6_nonpartitioned") .to_str() diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi index 83ed929..67dd0cc 100644 --- a/python/hudi/_internal.pyi +++ b/python/hudi/_internal.pyi @@ -33,8 +33,10 @@ class HudiFileSlice: base_file_size: int num_records: int + @property def base_file_relative_path(self) -> str: ... + class BindingHudiTable: def __init__( @@ -43,8 +45,12 @@ class BindingHudiTable: storage_options: Optional[Dict[str, str]] = None, ): ... - def schema(self) -> "pyarrow.Schema": ... + def get_schema(self) -> "pyarrow.Schema": ... + + def get_file_slices(self) -> List[HudiFileSlice]: ... + + def read_file_slice(self, base_file_relative_path) -> List["pyarrow.RecordBatch"]: ... - def get_latest_file_slices(self) -> List[HudiFileSlice]: ... + def read_snapshot(self) -> List["pyarrow.RecordBatch"]: ... - def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]: ... + def read_snapshot_as_of(self, timestamp: str) -> List["pyarrow.RecordBatch"]: ... diff --git a/python/hudi/table.py b/python/hudi/table.py index c9cab1b..943f423 100644 --- a/python/hudi/table.py +++ b/python/hudi/table.py @@ -36,16 +36,22 @@ class HudiTable: ): self._table = BindingHudiTable(str(table_uri), storage_options) - def schema(self) -> "pyarrow.Schema": - return self._table.schema() + def get_schema(self) -> "pyarrow.Schema": + return self._table.get_schema() - def split_latest_file_slices(self, n) -> Iterator[List[HudiFileSlice]]: - file_slices = self.get_latest_file_slices() + def split_file_slices(self, n: int) -> Iterator[List[HudiFileSlice]]: + file_slices = self.get_file_slices() for split in split_list(file_slices, n): yield split - def get_latest_file_slices(self) -> List[HudiFileSlice]: - return self._table.get_latest_file_slices() + def get_file_slices(self) -> List[HudiFileSlice]: + return self._table.get_file_slices() - def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]: - return self._table.read_file_slice(relative_path) + def read_file_slice(self, base_file_relative_path: str) -> List["pyarrow.RecordBatch"]: + return self._table.read_file_slice(base_file_relative_path) + + def read_snapshot(self) -> List["pyarrow.RecordBatch"]: + return self._table.read_snapshot() + + def read_snapshot_as_of(self, timestamp: str) -> List["pyarrow.RecordBatch"]: + return self._table.read_snapshot_as_of(timestamp) diff --git a/python/src/lib.rs b/python/src/lib.rs index 2436ee5..10f39ec 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -21,7 +21,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::OnceLock; -use anyhow::anyhow; +use anyhow::Context; use arrow::pyarrow::ToPyArrow; use pyo3::prelude::*; use tokio::runtime::Runtime; @@ -51,15 +51,15 @@ struct HudiFileSlice { #[pymethods] impl HudiFileSlice { pub fn base_file_relative_path(&self) -> PyResult<String> { - let mut p = PathBuf::from(&self.partition_path); - p.push(&self.base_file_name); - match p.to_str() { - Some(s) => Ok(s.to_string()), - None => Err(PyErr::from(anyhow!( + PathBuf::from(&self.partition_path) + .join(&self.base_file_name) + .to_str() + .map(String::from) + .context(format!( "Failed to get base file relative path for file slice: {:?}", self - ))), - } + )) + .map_err(PyErr::from) } } @@ -100,21 +100,30 @@ impl BindingHudiTable { Ok(BindingHudiTable { _table }) } - pub fn schema(&self, py: Python) -> PyResult<PyObject> { + pub fn get_schema(&self, py: Python) -> PyResult<PyObject> { rt().block_on(self._table.get_schema())?.to_pyarrow(py) } - pub fn get_latest_file_slices(&mut self, py: Python) -> PyResult<Vec<HudiFileSlice>> { + pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> { py.allow_threads(|| { let file_slices = rt().block_on(self._table.get_file_slices())?; Ok(file_slices.iter().map(convert_file_slice).collect()) }) } - pub fn read_file_slice(&mut self, relative_path: &str, py: Python) -> PyResult<PyObject> { + pub fn read_file_slice(&self, relative_path: &str, py: Python) -> PyResult<PyObject> { rt().block_on(self._table.read_file_slice_by_path(relative_path))? .to_pyarrow(py) } + + pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> { + rt().block_on(self._table.read_snapshot())?.to_pyarrow(py) + } + + pub fn read_snapshot_as_of(&self, timestamp: &str, py: Python) -> PyResult<PyObject> { + rt().block_on(self._table.read_snapshot_as_of(timestamp))? + .to_pyarrow(py) + } } #[cfg(not(tarpaulin))] diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index f0266f0..aec6d70 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -28,26 +28,52 @@ def test_sample_table(get_sample_table): table_path = get_sample_table table = HudiTable(table_path, {}) - assert table.schema().names == ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', - '_hoodie_partition_path', '_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver', - 'fare', 'city'] + assert table.get_schema().names == ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', + '_hoodie_partition_path', '_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver', + 'fare', 'city'] - file_slices = table.get_latest_file_slices() + file_slices = table.get_file_slices() assert len(file_slices) == 5 assert set(f.commit_time for f in file_slices) == {'20240402123035233', '20240402144910683'} assert all(f.num_records == 1 for f in file_slices) file_slice_paths = [f.base_file_relative_path() for f in file_slices] assert set(file_slice_paths) == {'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet', - 'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet', - 'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet', - 'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet', - 'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'} + 'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet', + 'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet', + 'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet', + 'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'} batches = table.read_file_slice(file_slice_paths[0]) t = pa.Table.from_batches(batches) assert t.num_rows == 1 assert t.num_columns == 11 - file_slices_gen = table.split_latest_file_slices(2) + file_slices_gen = table.split_file_slices(2) assert len(next(file_slices_gen)) == 3 assert len(next(file_slices_gen)) == 2 + + batches = table.read_snapshot() + t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") + assert t.to_pylist() == [{'_hoodie_commit_time': '20240402144910683', 'ts': 1695046462179, + 'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 'fare': 339.0}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695091554788, + 'uuid': 'e96c4396-3fad-413a-a942-4cb36106d721', 'fare': 27.7}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695115999911, + 'uuid': 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa', 'fare': 17.85}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695159649087, + 'uuid': '334e26e9-8355-45cc-97c6-c31daf0df330', 'fare': 19.1}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695516137016, + 'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c', 'fare': 34.15}] + + batches = table.read_snapshot_as_of("20240402123035233") + t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") + assert t.to_pylist() == [{'_hoodie_commit_time': '20240402123035233', 'ts': 1695046462179, + 'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 'fare': 33.9}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695091554788, + 'uuid': 'e96c4396-3fad-413a-a942-4cb36106d721', 'fare': 27.7}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695115999911, + 'uuid': 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa', 'fare': 17.85}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695159649087, + 'uuid': '334e26e9-8355-45cc-97c6-c31daf0df330', 'fare': 19.1}, + {'_hoodie_commit_time': '20240402123035233', 'ts': 1695516137016, + 'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c', 'fare': 34.15}]