This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new af997b7  feat: support MOR read-optimized query (#259)
af997b7 is described below

commit af997b772573bf2e44d81de7f0cd2a53433bc93b
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 21 23:04:56 2025 -0600

    feat: support MOR read-optimized query (#259)
    
    Add a read config `hoodie.read.use.read_optimized.mode` to allow performing 
read-optimized queries for MOR table.
---
 crates/core/src/config/read.rs | 72 ++++++++++++++++++++++++++++----
 crates/core/src/table/mod.rs   | 93 ++++++++++++++++++++++++++++++++----------
 crates/datafusion/src/lib.rs   | 13 +-----
 3 files changed, 137 insertions(+), 41 deletions(-)

diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 4f131c2..97d8a0f 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -23,7 +23,7 @@ use std::str::FromStr;
 
 use strum_macros::EnumIter;
 
-use crate::config::error::ConfigError::{NotFound, ParseInt};
+use crate::config::error::ConfigError::{NotFound, ParseBool, ParseInt};
 use crate::config::Result;
 use crate::config::{ConfigParser, HudiConfigValue};
 
@@ -52,6 +52,10 @@ pub enum HudiReadConfig {
 
     /// Parallelism for listing files on storage.
     ListingParallelism,
+
+    /// When set to true, only [BaseFile]s will be read for optimized reads.
+    /// This is only applicable to Merge-On-Read (MOR) tables.
+    UseReadOptimizedMode,
 }
 
 impl AsRef<str> for HudiReadConfig {
@@ -60,6 +64,7 @@ impl AsRef<str> for HudiReadConfig {
             Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
             Self::InputPartitions => "hoodie.read.input.partitions",
             Self::ListingParallelism => "hoodie.read.listing.parallelism",
+            Self::UseReadOptimizedMode => 
"hoodie.read.use.read_optimized.mode",
         }
     }
 }
@@ -71,6 +76,7 @@ impl ConfigParser for HudiReadConfig {
         match self {
             HudiReadConfig::InputPartitions => 
Some(HudiConfigValue::UInteger(0usize)),
             HudiReadConfig::ListingParallelism => 
Some(HudiConfigValue::UInteger(10usize)),
+            HudiReadConfig::UseReadOptimizedMode => 
Some(HudiConfigValue::Boolean(false)),
             _ => None,
         }
     }
@@ -93,6 +99,11 @@ impl ConfigParser for HudiReadConfig {
                     usize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::UInteger),
+            Self::UseReadOptimizedMode => get_result
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
+                })
+                .map(HudiConfigValue::Boolean),
         }
     }
 }
@@ -100,25 +111,70 @@ impl ConfigParser for HudiReadConfig {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::config::read::HudiReadConfig::InputPartitions;
+    use crate::config::read::HudiReadConfig::{
+        InputPartitions, ListingParallelism, UseReadOptimizedMode,
+    };
 
     #[test]
     fn parse_valid_config_value() {
-        let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"100".to_string())]);
-        let value = 
InputPartitions.parse_value(&options).unwrap().to::<usize>();
-        assert_eq!(value, 100);
+        let options = HashMap::from([
+            (InputPartitions.as_ref().to_string(), "100".to_string()),
+            (ListingParallelism.as_ref().to_string(), "100".to_string()),
+            (
+                UseReadOptimizedMode.as_ref().to_string(),
+                "true".to_string(),
+            ),
+        ]);
+        assert_eq!(
+            InputPartitions.parse_value(&options).unwrap().to::<usize>(),
+            100
+        );
+        assert_eq!(
+            ListingParallelism
+                .parse_value(&options)
+                .unwrap()
+                .to::<usize>(),
+            100
+        );
+        assert!(UseReadOptimizedMode
+            .parse_value(&options)
+            .unwrap()
+            .to::<bool>());
     }
 
     #[test]
     fn parse_invalid_config_value() {
-        let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"foo".to_string())]);
-        let value = InputPartitions.parse_value(&options);
-        assert!(matches!(value.unwrap_err(), ParseInt(_, _, _)));
+        let options = HashMap::from([
+            (InputPartitions.as_ref().to_string(), "foo".to_string()),
+            (ListingParallelism.as_ref().to_string(), "_100".to_string()),
+            (UseReadOptimizedMode.as_ref().to_string(), "1".to_string()),
+        ]);
+        assert!(matches!(
+            InputPartitions.parse_value(&options).unwrap_err(),
+            ParseInt(_, _, _)
+        ));
         assert_eq!(
             InputPartitions
                 .parse_value_or_default(&options)
                 .to::<usize>(),
             0
         );
+        assert!(matches!(
+            ListingParallelism.parse_value(&options).unwrap_err(),
+            ParseInt(_, _, _)
+        ));
+        assert_eq!(
+            ListingParallelism
+                .parse_value_or_default(&options)
+                .to::<usize>(),
+            10
+        );
+        assert!(matches!(
+            UseReadOptimizedMode.parse_value(&options).unwrap_err(),
+            ParseBool(_, _, _)
+        ));
+        assert!(!UseReadOptimizedMode
+            .parse_value_or_default(&options)
+            .to::<bool>(),)
     }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7f5a82c..07c173e 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -90,11 +90,10 @@ mod fs_view;
 mod listing;
 pub mod partition;
 
-use crate::config::read::HudiReadConfig::AsOfTimestamp;
+use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode};
 use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
-use crate::error::CoreError;
 use crate::expr::filter::{Filter, FilterField};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
@@ -141,11 +140,25 @@ impl Table {
             .await
     }
 
-    pub fn base_url(&self) -> Result<Url> {
+    #[inline]
+    pub fn base_url(&self) -> Url {
+        let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::BasePath);
         self.hudi_configs
-            .get(HudiTableConfig::BasePath)?
+            .get(HudiTableConfig::BasePath)
+            .expect(&err_msg)
             .to_url()
-            .map_err(CoreError::from)
+            .expect(&err_msg)
+    }
+
+    #[inline]
+    pub fn table_type(&self) -> TableTypeValue {
+        let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::TableType);
+        let table_type = self
+            .hudi_configs
+            .get(HudiTableConfig::TableType)
+            .expect(&err_msg)
+            .to::<String>();
+        TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
     }
 
     #[inline]
@@ -176,16 +189,6 @@ impl Table {
             .register_object_store(runtime_env.clone());
     }
 
-    pub fn get_table_type(&self) -> TableTypeValue {
-        let err_msg = format!("{:?} is missing or invalid.", 
HudiTableConfig::TableType);
-        let table_type = self
-            .hudi_configs
-            .get(HudiTableConfig::TableType)
-            .expect(&err_msg)
-            .to::<String>();
-        TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
-    }
-
     /// Get the latest [Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
@@ -289,11 +292,21 @@ impl Table {
     ///
     /// If the [AsOfTimestamp] configuration is set, the records at the 
specified timestamp will be returned.
     pub async fn read_snapshot(&self, filters: &[Filter]) -> 
Result<Vec<RecordBatch>> {
+        let read_optimized_mode = self
+            .hudi_configs
+            .get_or_default(UseReadOptimizedMode)
+            .to::<bool>();
+
         if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
-            self.read_snapshot_as_of(timestamp.to::<String>().as_str(), 
filters)
-                .await
+            self.read_snapshot_as_of(
+                timestamp.to::<String>().as_str(),
+                filters,
+                read_optimized_mode,
+            )
+            .await
         } else if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp() {
-            self.read_snapshot_as_of(timestamp, filters).await
+            self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)
+                .await
         } else {
             Ok(Vec::new())
         }
@@ -304,10 +317,12 @@ impl Table {
         &self,
         timestamp: &str,
         filters: &[Filter],
+        read_optimized_mode: bool,
     ) -> Result<Vec<RecordBatch>> {
         let file_slices = self.get_file_slices_as_of(timestamp, 
filters).await?;
         let fg_reader = self.create_file_group_reader();
-        let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
+        let base_file_only =
+            read_optimized_mode || self.table_type() == 
TableTypeValue::CopyOnWrite;
         let timezone = self.timezone();
         let instant_range = InstantRange::up_to(timestamp, &timezone);
         let batches = futures::future::try_join_all(
@@ -351,7 +366,9 @@ impl Table {
         ];
         let fg_reader =
             self.create_file_group_reader_with_filters(filters, 
MetaField::schema().as_ref())?;
-        let base_file_only = self.get_table_type() == 
TableTypeValue::CopyOnWrite;
+
+        // Read-optimized mode does not apply to incremental query semantics.
+        let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
         let timezone = self.timezone();
         let instant_range =
             InstantRange::within_open_closed(start_timestamp, end_timestamp, 
&timezone);
@@ -416,7 +433,7 @@ mod tests {
     /// Test helper to get relative file paths from the table with filters.
     async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> 
Result<Vec<String>> {
         let mut file_paths = Vec::new();
-        let base_url = table.base_url()?;
+        let base_url = table.base_url();
         for f in table.get_file_slices(filters).await? {
             let relative_path = f.base_file_relative_path()?;
             let file_url = join_url_segments(&base_url, 
&[relative_path.as_str()])?;
@@ -946,6 +963,36 @@ mod tests {
             Ok(())
         }
 
+        #[tokio::test]
+        async fn test_non_partitioned_read_optimized() -> Result<()> {
+            let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
+            let hudi_table = Table::new(base_url.path()).await?;
+            let commit_timestamps = hudi_table
+                .timeline
+                .completed_commits
+                .iter()
+                .map(|i| i.timestamp.as_str())
+                .collect::<Vec<_>>();
+            let latest_commit = commit_timestamps.last().unwrap();
+            let records = hudi_table
+                .read_snapshot_as_of(latest_commit, &[], true)
+                .await?;
+            let schema = &records[0].schema();
+            let records = concat_batches(schema, &records)?;
+
+            let sample_data = SampleTable::sample_data_order_by_id(&records);
+            assert_eq!(
+                sample_data,
+                vec![
+                    (1, "Alice", true), // this was updated to false in a log 
file and not to be read out
+                    (2, "Bob", false),
+                    (3, "Carol", true),
+                    (4, "Diana", true), // this was inserted in a base file 
and should be read out
+                ]
+            );
+            Ok(())
+        }
+
         #[tokio::test]
         async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
             for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
@@ -977,7 +1024,9 @@ mod tests {
                     .map(|i| i.timestamp.as_str())
                     .collect::<Vec<_>>();
                 let first_commit = commit_timestamps[0];
-                let records = hudi_table.read_snapshot_as_of(first_commit, 
&[]).await?;
+                let records = hudi_table
+                    .read_snapshot_as_of(first_commit, &[], false)
+                    .await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 2c14e56..5ffbef6 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -181,11 +181,7 @@ impl TableProvider for HudiDataSource {
             .get_file_slices_splits(self.get_input_partitions(), 
pushdown_filters.as_slice())
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
-        let base_url = self.table.base_url().map_err(|e| {
-            Execution(format!(
-                "Failed to get base path config from Hudi table: {e:?}"
-            ))
-        })?;
+        let base_url = self.table.base_url();
         let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
         for file_slice_vec in file_slices {
             let mut parquet_file_group_vec = Vec::new();
@@ -204,12 +200,7 @@ impl TableProvider for HudiDataSource {
             parquet_file_groups.push(parquet_file_group_vec)
         }
 
-        let base_url = self.table.base_url().map_err(|e| {
-            Execution(format!(
-                "Failed to get base path config from Hudi table: {}",
-                e
-            ))
-        })?;
+        let base_url = self.table.base_url();
         let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
         let fsc = FileScanConfig::new(url, self.schema())
             .with_file_groups(parquet_file_groups)

Reply via email to