This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push: new 4fa2440 feat: support time travel with read option (#52) 4fa2440 is described below commit 4fa2440985fd66ec67aebac21f4df74e8f32821e Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Sat Jul 6 20:36:17 2024 -0500 feat: support time travel with read option (#52) Support passing option `hoodie.read.as.of.timestamp` and perform time travel query. Fixes #39 --- crates/core/src/config/mod.rs | 13 +++++++++++- crates/core/src/config/read.rs | 26 +++++++++--------------- crates/core/src/table/mod.rs | 45 ++++++++++++++++++++++++++--------------- crates/datafusion/src/lib.rs | 19 +++++++++-------- python/src/lib.rs | 5 ----- python/tests/test_table_read.py | 4 +++- 6 files changed, 62 insertions(+), 50 deletions(-) diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs index f8975cf..2b37dc7 100644 --- a/crates/core/src/config/mod.rs +++ b/crates/core/src/config/mod.rs @@ -59,7 +59,7 @@ pub trait ConfigParser: AsRef<str> { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum HudiConfigValue { Boolean(bool), Integer(isize), @@ -157,4 +157,15 @@ impl HudiConfigs { ) -> HudiConfigValue { parser.parse_value_or_default(&self.raw_configs) } + + pub fn try_get( + &self, + parser: impl ConfigParser<Output = HudiConfigValue>, + ) -> Option<HudiConfigValue> { + let res = parser.parse_value(&self.raw_configs); + match res { + Ok(v) => Some(v), + Err(_) => parser.default_value(), + } + } } diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index 2af57e8..195aa70 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -27,12 +27,14 @@ use strum_macros::EnumIter; #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiReadConfig { InputPartitions, + AsOfTimestamp, } impl AsRef<str> for HudiReadConfig { fn as_ref(&self) -> &str { match self { Self::InputPartitions => "hoodie.read.input.partitions", + Self::AsOfTimestamp => "hoodie.read.as.of.timestamp", } } } @@ -43,6 +45,7 @@ impl ConfigParser for HudiReadConfig { fn default_value(&self) -> Option<HudiConfigValue> { match self { HudiReadConfig::InputPartitions => Some(HudiConfigValue::UInteger(0usize)), + _ => None, } } @@ -53,28 +56,20 @@ impl ConfigParser for HudiReadConfig { .ok_or(anyhow!("Config '{}' not found", self.as_ref())); match self { - HudiReadConfig::InputPartitions => get_result - .and_then(|v| { - usize::from_str(v).map_err(|e| { - anyhow!( - "Failed to parse '{}' for config '{}': {}", - v, - self.as_ref(), - e - ) - }) - }) + Self::InputPartitions => get_result + .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e))) .map(HudiConfigValue::UInteger), + Self::AsOfTimestamp => get_result.map(|v| HudiConfigValue::String(v.to_string())), } } } #[cfg(test)] mod tests { - use std::collections::HashMap; - use crate::config::read::HudiReadConfig::InputPartitions; use crate::config::ConfigParser; + use std::collections::HashMap; + use std::num::ParseIntError; #[test] fn parse_valid_config_value() { @@ -87,10 +82,7 @@ mod tests { fn parse_invalid_config_value() { let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]); let value = InputPartitions.parse_value(&options); - assert!(value.err().unwrap().to_string().starts_with(&format!( - "Failed to parse 'foo' for config '{}'", - InputPartitions.as_ref() - ))); + assert!(value.err().unwrap().is::<ParseIntError>()); assert_eq!( InputPartitions .parse_value_or_default(&options) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index db50525..bbc5673 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -34,6 +34,7 @@ use TableTypeValue::CopyOnWrite; use crate::config::internal::HudiInternalConfig; use crate::config::read::HudiReadConfig; +use crate::config::read::HudiReadConfig::AsOfTimestamp; use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; use crate::file_group::FileSlice; @@ -170,14 +171,17 @@ impl Table { } pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> { - if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { + if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) { + self.get_file_slices_as_of(timestamp.to::<String>().as_str()) + .await + } else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { self.get_file_slices_as_of(timestamp).await } else { Ok(Vec::new()) } } - pub async fn get_file_slices_as_of(&self, timestamp: &str) -> Result<Vec<FileSlice>> { + async fn get_file_slices_as_of(&self, timestamp: &str) -> Result<Vec<FileSlice>> { self.file_system_view .load_file_slices_stats_as_of(timestamp) .await @@ -186,14 +190,17 @@ impl Table { } pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> { - if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { + if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) { + self.read_snapshot_as_of(timestamp.to::<String>().as_str()) + .await + } else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { self.read_snapshot_as_of(timestamp).await } else { Ok(Vec::new()) } } - pub async fn read_snapshot_as_of(&self, timestamp: &str) -> Result<Vec<RecordBatch>> { + async fn read_snapshot_as_of(&self, timestamp: &str) -> Result<Vec<RecordBatch>> { let file_slices = self .get_file_slices_as_of(timestamp) .await @@ -233,8 +240,7 @@ mod tests { use url::Url; - use hudi_tests::{assert_not, TestTable}; - + use crate::config::read::HudiReadConfig::AsOfTimestamp; use crate::config::table::HudiTableConfig::{ BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields, IsHiveStylePartitioning, IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields, PopulatesMetaFields, @@ -243,6 +249,7 @@ mod tests { }; use crate::storage::utils::join_url_segments; use crate::table::Table; + use hudi_tests::{assert_not, TestTable}; #[tokio::test] async fn hudi_table_get_schema() { @@ -331,8 +338,8 @@ mod tests { #[tokio::test] async fn hudi_table_get_file_slices_as_of_timestamps() { let base_url = TestTable::V6Nonpartitioned.url(); - let hudi_table = Table::new(base_url.path(), HashMap::new()).await.unwrap(); + let hudi_table = Table::new(base_url.path(), HashMap::new()).await.unwrap(); let file_slices = hudi_table.get_file_slices().await.unwrap(); assert_eq!( file_slices @@ -343,10 +350,12 @@ mod tests { ); // as of the latest timestamp - let file_slices = hudi_table - .get_file_slices_as_of("20240418173551906") - .await - .unwrap(); + let opts = HashMap::from_iter(vec![( + AsOfTimestamp.as_ref().to_string(), + "20240418173551906".to_string(), + )]); + let hudi_table = Table::new(base_url.path(), opts).await.unwrap(); + let file_slices = hudi_table.get_file_slices().await.unwrap(); assert_eq!( file_slices .iter() @@ -356,10 +365,12 @@ mod tests { ); // as of just smaller than the latest timestamp - let file_slices = hudi_table - .get_file_slices_as_of("20240418173551905") - .await - .unwrap(); + let opts = HashMap::from_iter(vec![( + AsOfTimestamp.as_ref().to_string(), + "20240418173551905".to_string(), + )]); + let hudi_table = Table::new(base_url.path(), opts).await.unwrap(); + let file_slices = hudi_table.get_file_slices().await.unwrap(); assert_eq!( file_slices .iter() @@ -369,7 +380,9 @@ mod tests { ); // as of non-exist old timestamp - let file_slices = hudi_table.get_file_slices_as_of("0").await.unwrap(); + let opts = HashMap::from_iter(vec![(AsOfTimestamp.as_ref().to_string(), "0".to_string())]); + let hudi_table = Table::new(base_url.path(), opts).await.unwrap(); + let file_slices = hudi_table.get_file_slices().await.unwrap(); assert_eq!( file_slices .iter() diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 1435d94..f7a2c43 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -39,29 +39,28 @@ use datafusion_physical_expr::create_physical_expr; use DataFusionError::Execution; use hudi_core::config::read::HudiReadConfig::InputPartitions; -use hudi_core::config::ConfigParser; use hudi_core::storage::utils::{get_scheme_authority, parse_uri}; use hudi_core::HudiTable; #[derive(Clone, Debug)] pub struct HudiDataSource { table: Arc<HudiTable>, - input_partitions: usize, } impl HudiDataSource { pub async fn new(base_uri: &str, options: HashMap<String, String>) -> Result<Self> { - let input_partitions = InputPartitions - .parse_value_or_default(&options) - .to::<usize>(); match HudiTable::new(base_uri, options).await { - Ok(t) => Ok(Self { - table: Arc::new(t), - input_partitions, - }), + Ok(t) => Ok(Self { table: Arc::new(t) }), Err(e) => Err(Execution(format!("Failed to create Hudi table: {}", e))), } } + + fn get_input_partitions(&self) -> usize { + self.table + .configs + .get_or_default(InputPartitions) + .to::<usize>() + } } #[async_trait] @@ -92,7 +91,7 @@ impl TableProvider for HudiDataSource { ) -> Result<Arc<dyn ExecutionPlan>> { let file_slices = self .table - .split_file_slices(self.input_partitions) + .split_file_slices(self.get_input_partitions()) .await .map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?; let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new(); diff --git a/python/src/lib.rs b/python/src/lib.rs index 8962cff..745b457 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -129,11 +129,6 @@ impl BindingHudiTable { pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> { rt().block_on(self._table.read_snapshot())?.to_pyarrow(py) } - - pub fn read_snapshot_as_of(&self, timestamp: &str, py: Python) -> PyResult<PyObject> { - rt().block_on(self._table.read_snapshot_as_of(timestamp))? - .to_pyarrow(py) - } } #[cfg(not(tarpaulin))] diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 6370bd5..61195fb 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -65,7 +65,9 @@ def test_sample_table(get_sample_table): {'_hoodie_commit_time': '20240402123035233', 'ts': 1695516137016, 'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c', 'fare': 34.15}] - batches = table.read_snapshot_as_of("20240402123035233") + table = HudiTable(table_path, { + "hoodie.read.as.of.timestamp": "20240402123035233"}) + batches = table.read_snapshot() t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts") assert t.to_pylist() == [{'_hoodie_commit_time': '20240402123035233', 'ts': 1695046462179, 'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 'fare': 33.9},