This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new af997b7 feat: support MOR read-optimized query (#259)
af997b7 is described below
commit af997b772573bf2e44d81de7f0cd2a53433bc93b
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Jan 21 23:04:56 2025 -0600
feat: support MOR read-optimized query (#259)
Add a read config `hoodie.read.use.read_optimized.mode` to allow performing
read-optimized queries for MOR table.
---
crates/core/src/config/read.rs | 72 ++++++++++++++++++++++++++++----
crates/core/src/table/mod.rs | 93 ++++++++++++++++++++++++++++++++----------
crates/datafusion/src/lib.rs | 13 +-----
3 files changed, 137 insertions(+), 41 deletions(-)
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 4f131c2..97d8a0f 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -23,7 +23,7 @@ use std::str::FromStr;
use strum_macros::EnumIter;
-use crate::config::error::ConfigError::{NotFound, ParseInt};
+use crate::config::error::ConfigError::{NotFound, ParseBool, ParseInt};
use crate::config::Result;
use crate::config::{ConfigParser, HudiConfigValue};
@@ -52,6 +52,10 @@ pub enum HudiReadConfig {
/// Parallelism for listing files on storage.
ListingParallelism,
+
+ /// When set to true, only [BaseFile]s will be read for optimized reads.
+ /// This is only applicable to Merge-On-Read (MOR) tables.
+ UseReadOptimizedMode,
}
impl AsRef<str> for HudiReadConfig {
@@ -60,6 +64,7 @@ impl AsRef<str> for HudiReadConfig {
Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
Self::InputPartitions => "hoodie.read.input.partitions",
Self::ListingParallelism => "hoodie.read.listing.parallelism",
+ Self::UseReadOptimizedMode =>
"hoodie.read.use.read_optimized.mode",
}
}
}
@@ -71,6 +76,7 @@ impl ConfigParser for HudiReadConfig {
match self {
HudiReadConfig::InputPartitions =>
Some(HudiConfigValue::UInteger(0usize)),
HudiReadConfig::ListingParallelism =>
Some(HudiConfigValue::UInteger(10usize)),
+ HudiReadConfig::UseReadOptimizedMode =>
Some(HudiConfigValue::Boolean(false)),
_ => None,
}
}
@@ -93,6 +99,11 @@ impl ConfigParser for HudiReadConfig {
usize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
+ Self::UseReadOptimizedMode => get_result
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
+ })
+ .map(HudiConfigValue::Boolean),
}
}
}
@@ -100,25 +111,70 @@ impl ConfigParser for HudiReadConfig {
#[cfg(test)]
mod tests {
use super::*;
- use crate::config::read::HudiReadConfig::InputPartitions;
+ use crate::config::read::HudiReadConfig::{
+ InputPartitions, ListingParallelism, UseReadOptimizedMode,
+ };
#[test]
fn parse_valid_config_value() {
- let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"100".to_string())]);
- let value =
InputPartitions.parse_value(&options).unwrap().to::<usize>();
- assert_eq!(value, 100);
+ let options = HashMap::from([
+ (InputPartitions.as_ref().to_string(), "100".to_string()),
+ (ListingParallelism.as_ref().to_string(), "100".to_string()),
+ (
+ UseReadOptimizedMode.as_ref().to_string(),
+ "true".to_string(),
+ ),
+ ]);
+ assert_eq!(
+ InputPartitions.parse_value(&options).unwrap().to::<usize>(),
+ 100
+ );
+ assert_eq!(
+ ListingParallelism
+ .parse_value(&options)
+ .unwrap()
+ .to::<usize>(),
+ 100
+ );
+ assert!(UseReadOptimizedMode
+ .parse_value(&options)
+ .unwrap()
+ .to::<bool>());
}
#[test]
fn parse_invalid_config_value() {
- let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"foo".to_string())]);
- let value = InputPartitions.parse_value(&options);
- assert!(matches!(value.unwrap_err(), ParseInt(_, _, _)));
+ let options = HashMap::from([
+ (InputPartitions.as_ref().to_string(), "foo".to_string()),
+ (ListingParallelism.as_ref().to_string(), "_100".to_string()),
+ (UseReadOptimizedMode.as_ref().to_string(), "1".to_string()),
+ ]);
+ assert!(matches!(
+ InputPartitions.parse_value(&options).unwrap_err(),
+ ParseInt(_, _, _)
+ ));
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
.to::<usize>(),
0
);
+ assert!(matches!(
+ ListingParallelism.parse_value(&options).unwrap_err(),
+ ParseInt(_, _, _)
+ ));
+ assert_eq!(
+ ListingParallelism
+ .parse_value_or_default(&options)
+ .to::<usize>(),
+ 10
+ );
+ assert!(matches!(
+ UseReadOptimizedMode.parse_value(&options).unwrap_err(),
+ ParseBool(_, _, _)
+ ));
+ assert!(!UseReadOptimizedMode
+ .parse_value_or_default(&options)
+ .to::<bool>(),)
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7f5a82c..07c173e 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -90,11 +90,10 @@ mod fs_view;
mod listing;
pub mod partition;
-use crate::config::read::HudiReadConfig::AsOfTimestamp;
+use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode};
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
-use crate::error::CoreError;
use crate::expr::filter::{Filter, FilterField};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
@@ -141,11 +140,25 @@ impl Table {
.await
}
- pub fn base_url(&self) -> Result<Url> {
+ #[inline]
+ pub fn base_url(&self) -> Url {
+ let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::BasePath);
self.hudi_configs
- .get(HudiTableConfig::BasePath)?
+ .get(HudiTableConfig::BasePath)
+ .expect(&err_msg)
.to_url()
- .map_err(CoreError::from)
+ .expect(&err_msg)
+ }
+
+ #[inline]
+ pub fn table_type(&self) -> TableTypeValue {
+ let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::TableType);
+ let table_type = self
+ .hudi_configs
+ .get(HudiTableConfig::TableType)
+ .expect(&err_msg)
+ .to::<String>();
+ TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}
#[inline]
@@ -176,16 +189,6 @@ impl Table {
.register_object_store(runtime_env.clone());
}
- pub fn get_table_type(&self) -> TableTypeValue {
- let err_msg = format!("{:?} is missing or invalid.",
HudiTableConfig::TableType);
- let table_type = self
- .hudi_configs
- .get(HudiTableConfig::TableType)
- .expect(&err_msg)
- .to::<String>();
- TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
- }
-
/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
@@ -289,11 +292,21 @@ impl Table {
///
/// If the [AsOfTimestamp] configuration is set, the records at the
specified timestamp will be returned.
pub async fn read_snapshot(&self, filters: &[Filter]) ->
Result<Vec<RecordBatch>> {
+ let read_optimized_mode = self
+ .hudi_configs
+ .get_or_default(UseReadOptimizedMode)
+ .to::<bool>();
+
if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
- self.read_snapshot_as_of(timestamp.to::<String>().as_str(),
filters)
- .await
+ self.read_snapshot_as_of(
+ timestamp.to::<String>().as_str(),
+ filters,
+ read_optimized_mode,
+ )
+ .await
} else if let Some(timestamp) =
self.timeline.get_latest_commit_timestamp() {
- self.read_snapshot_as_of(timestamp, filters).await
+ self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)
+ .await
} else {
Ok(Vec::new())
}
@@ -304,10 +317,12 @@ impl Table {
&self,
timestamp: &str,
filters: &[Filter],
+ read_optimized_mode: bool,
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp,
filters).await?;
let fg_reader = self.create_file_group_reader();
- let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
+ let base_file_only =
+ read_optimized_mode || self.table_type() ==
TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range = InstantRange::up_to(timestamp, &timezone);
let batches = futures::future::try_join_all(
@@ -351,7 +366,9 @@ impl Table {
];
let fg_reader =
self.create_file_group_reader_with_filters(filters,
MetaField::schema().as_ref())?;
- let base_file_only = self.get_table_type() ==
TableTypeValue::CopyOnWrite;
+
+ // Read-optimized mode does not apply to incremental query semantics.
+ let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range =
InstantRange::within_open_closed(start_timestamp, end_timestamp,
&timezone);
@@ -416,7 +433,7 @@ mod tests {
/// Test helper to get relative file paths from the table with filters.
async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) ->
Result<Vec<String>> {
let mut file_paths = Vec::new();
- let base_url = table.base_url()?;
+ let base_url = table.base_url();
for f in table.get_file_slices(filters).await? {
let relative_path = f.base_file_relative_path()?;
let file_url = join_url_segments(&base_url,
&[relative_path.as_str()])?;
@@ -946,6 +963,36 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_non_partitioned_read_optimized() -> Result<()> {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
+ let hudi_table = Table::new(base_url.path()).await?;
+ let commit_timestamps = hudi_table
+ .timeline
+ .completed_commits
+ .iter()
+ .map(|i| i.timestamp.as_str())
+ .collect::<Vec<_>>();
+ let latest_commit = commit_timestamps.last().unwrap();
+ let records = hudi_table
+ .read_snapshot_as_of(latest_commit, &[], true)
+ .await?;
+ let schema = &records[0].schema();
+ let records = concat_batches(schema, &records)?;
+
+ let sample_data = SampleTable::sample_data_order_by_id(&records);
+ assert_eq!(
+ sample_data,
+ vec![
+ (1, "Alice", true), // this was updated to false in a log
file and not to be read out
+ (2, "Bob", false),
+ (3, "Carol", true),
+ (4, "Diana", true), // this was inserted in a base file
and should be read out
+ ]
+ );
+ Ok(())
+ }
+
#[tokio::test]
async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
@@ -977,7 +1024,9 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records = hudi_table.read_snapshot_as_of(first_commit,
&[]).await?;
+ let records = hudi_table
+ .read_snapshot_as_of(first_commit, &[], false)
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 2c14e56..5ffbef6 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -181,11 +181,7 @@ impl TableProvider for HudiDataSource {
.get_file_slices_splits(self.get_input_partitions(),
pushdown_filters.as_slice())
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
- let base_url = self.table.base_url().map_err(|e| {
- Execution(format!(
- "Failed to get base path config from Hudi table: {e:?}"
- ))
- })?;
+ let base_url = self.table.base_url();
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
for file_slice_vec in file_slices {
let mut parquet_file_group_vec = Vec::new();
@@ -204,12 +200,7 @@ impl TableProvider for HudiDataSource {
parquet_file_groups.push(parquet_file_group_vec)
}
- let base_url = self.table.base_url().map_err(|e| {
- Execution(format!(
- "Failed to get base path config from Hudi table: {}",
- e
- ))
- })?;
+ let base_url = self.table.base_url();
let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
let fsc = FileScanConfig::new(url, self.schema())
.with_file_groups(parquet_file_groups)