This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fa2440  feat: support time travel with read option (#52)
4fa2440 is described below

commit 4fa2440985fd66ec67aebac21f4df74e8f32821e
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Sat Jul 6 20:36:17 2024 -0500

    feat: support time travel with read option (#52)
    
    Support passing option `hoodie.read.as.of.timestamp` and perform time 
travel query.
    
    Fixes #39
---
 crates/core/src/config/mod.rs   | 13 +++++++++++-
 crates/core/src/config/read.rs  | 26 +++++++++---------------
 crates/core/src/table/mod.rs    | 45 ++++++++++++++++++++++++++---------------
 crates/datafusion/src/lib.rs    | 19 +++++++++--------
 python/src/lib.rs               |  5 -----
 python/tests/test_table_read.py |  4 +++-
 6 files changed, 62 insertions(+), 50 deletions(-)

diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index f8975cf..2b37dc7 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -59,7 +59,7 @@ pub trait ConfigParser: AsRef<str> {
     }
 }
 
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 pub enum HudiConfigValue {
     Boolean(bool),
     Integer(isize),
@@ -157,4 +157,15 @@ impl HudiConfigs {
     ) -> HudiConfigValue {
         parser.parse_value_or_default(&self.raw_configs)
     }
+
+    pub fn try_get(
+        &self,
+        parser: impl ConfigParser<Output = HudiConfigValue>,
+    ) -> Option<HudiConfigValue> {
+        let res = parser.parse_value(&self.raw_configs);
+        match res {
+            Ok(v) => Some(v),
+            Err(_) => parser.default_value(),
+        }
+    }
 }
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 2af57e8..195aa70 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -27,12 +27,14 @@ use strum_macros::EnumIter;
 #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiReadConfig {
     InputPartitions,
+    AsOfTimestamp,
 }
 
 impl AsRef<str> for HudiReadConfig {
     fn as_ref(&self) -> &str {
         match self {
             Self::InputPartitions => "hoodie.read.input.partitions",
+            Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
         }
     }
 }
@@ -43,6 +45,7 @@ impl ConfigParser for HudiReadConfig {
     fn default_value(&self) -> Option<HudiConfigValue> {
         match self {
             HudiReadConfig::InputPartitions => 
Some(HudiConfigValue::UInteger(0usize)),
+            _ => None,
         }
     }
 
@@ -53,28 +56,20 @@ impl ConfigParser for HudiReadConfig {
             .ok_or(anyhow!("Config '{}' not found", self.as_ref()));
 
         match self {
-            HudiReadConfig::InputPartitions => get_result
-                .and_then(|v| {
-                    usize::from_str(v).map_err(|e| {
-                        anyhow!(
-                            "Failed to parse '{}' for config '{}': {}",
-                            v,
-                            self.as_ref(),
-                            e
-                        )
-                    })
-                })
+            Self::InputPartitions => get_result
+                .and_then(|v| usize::from_str(v).map_err(|e| anyhow!(e)))
                 .map(HudiConfigValue::UInteger),
+            Self::AsOfTimestamp => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
         }
     }
 }
 
 #[cfg(test)]
 mod tests {
-    use std::collections::HashMap;
-
     use crate::config::read::HudiReadConfig::InputPartitions;
     use crate::config::ConfigParser;
+    use std::collections::HashMap;
+    use std::num::ParseIntError;
 
     #[test]
     fn parse_valid_config_value() {
@@ -87,10 +82,7 @@ mod tests {
     fn parse_invalid_config_value() {
         let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"foo".to_string())]);
         let value = InputPartitions.parse_value(&options);
-        assert!(value.err().unwrap().to_string().starts_with(&format!(
-            "Failed to parse 'foo' for config '{}'",
-            InputPartitions.as_ref()
-        )));
+        assert!(value.err().unwrap().is::<ParseIntError>());
         assert_eq!(
             InputPartitions
                 .parse_value_or_default(&options)
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index db50525..bbc5673 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -34,6 +34,7 @@ use TableTypeValue::CopyOnWrite;
 
 use crate::config::internal::HudiInternalConfig;
 use crate::config::read::HudiReadConfig;
+use crate::config::read::HudiReadConfig::AsOfTimestamp;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
 use crate::file_group::FileSlice;
@@ -170,14 +171,17 @@ impl Table {
     }
 
     pub async fn get_file_slices(&self) -> Result<Vec<FileSlice>> {
-        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+            self.get_file_slices_as_of(timestamp.to::<String>().as_str())
+                .await
+        } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
             self.get_file_slices_as_of(timestamp).await
         } else {
             Ok(Vec::new())
         }
     }
 
-    pub async fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
+    async fn get_file_slices_as_of(&self, timestamp: &str) -> 
Result<Vec<FileSlice>> {
         self.file_system_view
             .load_file_slices_stats_as_of(timestamp)
             .await
@@ -186,14 +190,17 @@ impl Table {
     }
 
     pub async fn read_snapshot(&self) -> Result<Vec<RecordBatch>> {
-        if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
+        if let Some(timestamp) = self.configs.try_get(AsOfTimestamp) {
+            self.read_snapshot_as_of(timestamp.to::<String>().as_str())
+                .await
+        } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
             self.read_snapshot_as_of(timestamp).await
         } else {
             Ok(Vec::new())
         }
     }
 
-    pub async fn read_snapshot_as_of(&self, timestamp: &str) -> 
Result<Vec<RecordBatch>> {
+    async fn read_snapshot_as_of(&self, timestamp: &str) -> 
Result<Vec<RecordBatch>> {
         let file_slices = self
             .get_file_slices_as_of(timestamp)
             .await
@@ -233,8 +240,7 @@ mod tests {
 
     use url::Url;
 
-    use hudi_tests::{assert_not, TestTable};
-
+    use crate::config::read::HudiReadConfig::AsOfTimestamp;
     use crate::config::table::HudiTableConfig::{
         BaseFileFormat, Checksum, DatabaseName, DropsPartitionFields, 
IsHiveStylePartitioning,
         IsPartitionPathUrlencoded, KeyGeneratorClass, PartitionFields, 
PopulatesMetaFields,
@@ -243,6 +249,7 @@ mod tests {
     };
     use crate::storage::utils::join_url_segments;
     use crate::table::Table;
+    use hudi_tests::{assert_not, TestTable};
 
     #[tokio::test]
     async fn hudi_table_get_schema() {
@@ -331,8 +338,8 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_get_file_slices_as_of_timestamps() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
 
+        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
         let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
@@ -343,10 +350,12 @@ mod tests {
         );
 
         // as of the latest timestamp
-        let file_slices = hudi_table
-            .get_file_slices_as_of("20240418173551906")
-            .await
-            .unwrap();
+        let opts = HashMap::from_iter(vec![(
+            AsOfTimestamp.as_ref().to_string(),
+            "20240418173551906".to_string(),
+        )]);
+        let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+        let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -356,10 +365,12 @@ mod tests {
         );
 
         // as of just smaller than the latest timestamp
-        let file_slices = hudi_table
-            .get_file_slices_as_of("20240418173551905")
-            .await
-            .unwrap();
+        let opts = HashMap::from_iter(vec![(
+            AsOfTimestamp.as_ref().to_string(),
+            "20240418173551905".to_string(),
+        )]);
+        let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+        let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -369,7 +380,9 @@ mod tests {
         );
 
         // as of non-exist old timestamp
-        let file_slices = hudi_table.get_file_slices_as_of("0").await.unwrap();
+        let opts = 
HashMap::from_iter(vec![(AsOfTimestamp.as_ref().to_string(), "0".to_string())]);
+        let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+        let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 1435d94..f7a2c43 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -39,29 +39,28 @@ use datafusion_physical_expr::create_physical_expr;
 use DataFusionError::Execution;
 
 use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::config::ConfigParser;
 use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
 use hudi_core::HudiTable;
 
 #[derive(Clone, Debug)]
 pub struct HudiDataSource {
     table: Arc<HudiTable>,
-    input_partitions: usize,
 }
 
 impl HudiDataSource {
     pub async fn new(base_uri: &str, options: HashMap<String, String>) -> 
Result<Self> {
-        let input_partitions = InputPartitions
-            .parse_value_or_default(&options)
-            .to::<usize>();
         match HudiTable::new(base_uri, options).await {
-            Ok(t) => Ok(Self {
-                table: Arc::new(t),
-                input_partitions,
-            }),
+            Ok(t) => Ok(Self { table: Arc::new(t) }),
             Err(e) => Err(Execution(format!("Failed to create Hudi table: {}", 
e))),
         }
     }
+
+    fn get_input_partitions(&self) -> usize {
+        self.table
+            .configs
+            .get_or_default(InputPartitions)
+            .to::<usize>()
+    }
 }
 
 #[async_trait]
@@ -92,7 +91,7 @@ impl TableProvider for HudiDataSource {
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let file_slices = self
             .table
-            .split_file_slices(self.input_partitions)
+            .split_file_slices(self.get_input_partitions())
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
         let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 8962cff..745b457 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -129,11 +129,6 @@ impl BindingHudiTable {
     pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
     }
-
-    pub fn read_snapshot_as_of(&self, timestamp: &str, py: Python) -> 
PyResult<PyObject> {
-        rt().block_on(self._table.read_snapshot_as_of(timestamp))?
-            .to_pyarrow(py)
-    }
 }
 
 #[cfg(not(tarpaulin))]
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 6370bd5..61195fb 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -65,7 +65,9 @@ def test_sample_table(get_sample_table):
                              {'_hoodie_commit_time': '20240402123035233', 
'ts': 1695516137016,
                               'uuid': 'e3cf430c-889d-4015-bc98-59bdce1e530c', 
'fare': 34.15}]
 
-    batches = table.read_snapshot_as_of("20240402123035233")
+    table = HudiTable(table_path, {
+        "hoodie.read.as.of.timestamp": "20240402123035233"})
+    batches = table.read_snapshot()
     t = pa.Table.from_batches(batches).select([0, 5, 6, 9]).sort_by("ts")
     assert t.to_pylist() == [{'_hoodie_commit_time': '20240402123035233', 
'ts': 1695046462179,
                               'uuid': '9909a8b1-2d15-4d3d-8ec9-efc48c536a00', 
'fare': 33.9},

Reply via email to