This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push: new f1c2818 feat: add config validation when creating table (#49) f1c2818 is described below commit f1c2818d212e3bca37c08c883508953875535718 Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Fri Jul 5 23:54:56 2024 -0500 feat: add config validation when creating table (#49) Guard table create by validating all recognizable hudi configs. Fixes #40 --- crates/core/Cargo.toml | 1 + crates/core/src/config/read.rs | 6 +- crates/core/src/config/table.rs | 9 +-- crates/core/src/storage/mod.rs | 25 ++++---- crates/core/src/table/fs_view.rs | 6 +- crates/core/src/table/mod.rs | 128 ++++++++++++++++++++++++++++++++------ crates/core/src/table/timeline.rs | 2 +- 7 files changed, 132 insertions(+), 45 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 06e15a4..5e94cf3 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -61,6 +61,7 @@ bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } hashbrown = "0.14.3" regex = { workspace = true } +strum = { workspace = true } strum_macros = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } url = { workspace = true } diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index 43879de..2af57e8 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -20,11 +20,11 @@ use std::collections::HashMap; use std::str::FromStr; -use anyhow::{anyhow, Result}; - use crate::config::{ConfigParser, HudiConfigValue}; +use anyhow::{anyhow, Result}; +use strum_macros::EnumIter; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiReadConfig { InputPartitions, } diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs index d503e32..a23ecbd 100644 --- a/crates/core/src/config/table.rs +++ b/crates/core/src/config/table.rs @@ -22,11 +22,11 @@ use std::str::FromStr; use anyhow::anyhow; use anyhow::Result; -use strum_macros::AsRefStr; +use strum_macros::{AsRefStr, EnumIter}; use crate::config::{ConfigParser, HudiConfigValue}; -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)] pub enum HudiTableConfig { BaseFileFormat, Checksum, @@ -80,10 +80,7 @@ impl ConfigParser for HudiTableConfig { } fn is_required(&self) -> bool { - matches!( - self, - Self::BaseFileFormat | Self::TableName | Self::TableType | Self::TableVersion - ) + matches!(self, Self::TableName | Self::TableType | Self::TableVersion) } fn parse_value(&self, configs: &HashMap<String, String>) -> Result<Self::Output> { diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 42cb147..374f334 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -41,21 +41,18 @@ pub mod file_info; pub mod file_stats; pub mod utils; -#[allow(dead_code)] #[derive(Clone, Debug)] pub struct Storage { base_url: Arc<Url>, - options: Arc<HashMap<String, String>>, object_store: Arc<dyn ObjectStore>, } impl Storage { - pub fn new(base_url: Arc<Url>, options: Arc<HashMap<String, String>>) -> Result<Arc<Storage>> { - match parse_url_opts(&base_url, &*options) { - Ok(object_store) => Ok(Arc::new(Storage { + pub fn new(base_url: Arc<Url>, options: &HashMap<String, String>) -> Result<Arc<Storage>> { + match parse_url_opts(&base_url, options) { + Ok((object_store, _)) => Ok(Arc::new(Storage { base_url, - options, - object_store: Arc::new(object_store.0), + object_store: Arc::new(object_store), })), Err(e) => Err(anyhow!("Failed to create storage: {}", e)), } @@ -216,7 +213,7 @@ mod tests { canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let first_level_dirs: HashSet<String> = storage.list_dirs(None).await.unwrap().into_iter().collect(); assert_eq!( @@ -238,7 +235,7 @@ mod tests { canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let first_level_dirs: HashSet<ObjPath> = storage .list_dirs_as_obj_paths(None) .await @@ -261,7 +258,7 @@ mod tests { canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let file_info_1: Vec<FileInfo> = storage .list_files(None) .await @@ -320,7 +317,7 @@ mod tests { canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(), ) .unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap(); assert_eq!( leaf_dirs, @@ -333,7 +330,7 @@ mod tests { let base_url = Url::from_directory_path(canonicalize(Path::new("tests/data/leaf_dir")).unwrap()) .unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap(); assert_eq!( leaf_dirs, @@ -346,7 +343,7 @@ mod tests { async fn storage_get_file_info() { let base_url = Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let file_info = storage.get_file_info("a.parquet").await.unwrap(); assert_eq!(file_info.name, "a.parquet"); assert_eq!( @@ -360,7 +357,7 @@ mod tests { async fn storage_get_parquet_file_data() { let base_url = Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let file_data = storage.get_parquet_file_data("a.parquet").await.unwrap(); assert_eq!(file_data.num_rows(), 5); } diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs index 02d3ca4..bdc05fc 100644 --- a/crates/core/src/table/fs_view.rs +++ b/crates/core/src/table/fs_view.rs @@ -44,7 +44,7 @@ impl FileSystemView { storage_options: Arc<HashMap<String, String>>, configs: Arc<HudiConfigs>, ) -> Result<Self> { - let storage = Storage::new(base_url, storage_options)?; + let storage = Storage::new(base_url, &storage_options)?; let partition_paths = Self::load_partition_paths(&storage).await?; let partition_to_file_groups = Self::load_file_groups_for_partitions(&storage, partition_paths).await?; @@ -176,7 +176,7 @@ mod tests { #[tokio::test] async fn get_partition_paths_for_nonpartitioned_table() { let base_url = TestTable::V6Nonpartitioned.url(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let partition_paths = FileSystemView::load_partition_paths(&storage) .await .unwrap(); @@ -188,7 +188,7 @@ mod tests { #[tokio::test] async fn get_partition_paths_for_complexkeygen_table() { let base_url = TestTable::V6ComplexkeygenHivestyle.url(); - let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap(); + let storage = Storage::new(Arc::new(base_url), &HashMap::new()).unwrap(); let partition_paths = FileSystemView::load_partition_paths(&storage) .await .unwrap(); diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index e09fd10..1493394 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -19,13 +19,20 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader}; +use std::str::FromStr; use std::sync::Arc; use anyhow::{anyhow, Context, Result}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; +use strum::IntoEnumIterator; use url::Url; +use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion}; +use TableTypeValue::CopyOnWrite; + +use crate::config::read::HudiReadConfig; +use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; use crate::file_group::FileSlice; use crate::storage::utils::parse_uri; @@ -39,49 +46,57 @@ mod timeline; #[derive(Clone, Debug)] pub struct Table { pub base_url: Arc<Url>, - pub storage_options: Arc<HashMap<String, String>>, pub configs: Arc<HudiConfigs>, + pub extra_options: Arc<HashMap<String, String>>, pub timeline: Timeline, pub file_system_view: FileSystemView, } impl Table { - pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) -> Result<Self> { + pub async fn new(base_uri: &str, all_options: HashMap<String, String>) -> Result<Self> { let base_url = Arc::new(parse_uri(base_uri)?); - let storage_options = Arc::new(storage_options); - let configs = Self::load_properties(base_url.clone(), storage_options.clone()) + let (configs, extra_options) = Self::load_configs(base_url.clone(), &all_options) .await .context("Failed to load table properties")?; let configs = Arc::new(configs); + let extra_options = Arc::new(extra_options); - let timeline = Timeline::new(base_url.clone(), storage_options.clone(), configs.clone()) + let timeline = Timeline::new(base_url.clone(), extra_options.clone(), configs.clone()) .await .context("Failed to load timeline")?; let file_system_view = - FileSystemView::new(base_url.clone(), storage_options.clone(), configs.clone()) + FileSystemView::new(base_url.clone(), extra_options.clone(), configs.clone()) .await .context("Failed to load file system view")?; Ok(Table { base_url, - storage_options, configs, + extra_options, timeline, file_system_view, }) } - async fn load_properties( + async fn load_configs( base_url: Arc<Url>, - storage_options: Arc<HashMap<String, String>>, - ) -> Result<HudiConfigs> { - let storage = Storage::new(base_url, storage_options)?; + all_options: &HashMap<String, String>, + ) -> Result<(HudiConfigs, HashMap<String, String>)> { + let mut hudi_options = HashMap::new(); + let mut extra_options = HashMap::new(); + for (k, v) in all_options { + if k.starts_with("hoodie.") { + hudi_options.insert(k.clone(), v.clone()); + } else { + extra_options.insert(k.clone(), v.clone()); + } + } + let storage = Storage::new(base_url, &extra_options)?; let data = storage.get_file_data(".hoodie/hoodie.properties").await?; let cursor = std::io::Cursor::new(data); let lines = BufReader::new(cursor).lines(); - let mut properties: HashMap<String, String> = HashMap::new(); for line in lines { let line = line?; let trimmed_line = line.trim(); @@ -91,9 +106,54 @@ impl Table { let mut parts = trimmed_line.splitn(2, '='); let key = parts.next().unwrap().to_owned(); let value = parts.next().unwrap_or("").to_owned(); - properties.insert(key, value); + // `hoodie.properties` takes precedence TODO handle conflicts where applicable + hudi_options.insert(key, value); + } + let hudi_configs = HudiConfigs::new(hudi_options); + + Self::validate_configs(&hudi_configs, &extra_options).map(|_| (hudi_configs, extra_options)) + } + + fn validate_configs( + hudi_configs: &HudiConfigs, + extra_options: &HashMap<String, String>, + ) -> Result<()> { + if extra_options + .get("hoodie_internal.skip.config.validation") + .and_then(|v| bool::from_str(v).ok()) + .unwrap_or(false) + { + return Ok(()); + } + + for conf in HudiTableConfig::iter() { + hudi_configs.validate(conf)? } - Ok(HudiConfigs::new(properties)) + + for conf in HudiReadConfig::iter() { + hudi_configs.validate(conf)? + } + + // additional validation + let table_type = hudi_configs.get(TableType)?.to::<String>(); + if TableTypeValue::from_str(&table_type)? != CopyOnWrite { + return Err(anyhow!("Only support copy-on-write table.")); + } + + let table_version = hudi_configs.get(TableVersion)?.to::<isize>(); + if !(5..=6).contains(&table_version) { + return Err(anyhow!("Only support table version 5 and 6.")); + } + + let drops_partition_cols = hudi_configs.get(DropsPartitionFields)?.to::<bool>(); + if drops_partition_cols { + return Err(anyhow!( + "Only support when `{}` is disabled", + DropsPartitionFields.as_ref() + )); + } + + Ok(()) } pub async fn get_schema(&self) -> Result<Schema> { @@ -326,7 +386,15 @@ mod tests { let base_url = Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap()) .unwrap(); - let table = Table::new(base_url.as_str(), HashMap::new()).await.unwrap(); + let table = Table::new( + base_url.as_str(), + HashMap::from_iter(vec![( + "hoodie_internal.skip.config.validation".to_string(), + "true".to_string(), + )]), + ) + .await + .unwrap(); let configs = table.configs; assert!( configs.validate(BaseFileFormat).is_err(), @@ -377,7 +445,15 @@ mod tests { let base_url = Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap()) .unwrap(); - let table = Table::new(base_url.as_str(), HashMap::new()).await.unwrap(); + let table = Table::new( + base_url.as_str(), + HashMap::from_iter(vec![( + "hoodie_internal.skip.config.validation".to_string(), + "true".to_string(), + )]), + ) + .await + .unwrap(); let configs = table.configs; assert!(configs.get(BaseFileFormat).is_err()); assert!(configs.get(Checksum).is_err()); @@ -401,7 +477,15 @@ mod tests { let base_url = Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap()) .unwrap(); - let table = Table::new(base_url.as_str(), HashMap::new()).await.unwrap(); + let table = Table::new( + base_url.as_str(), + HashMap::from_iter(vec![( + "hoodie_internal.skip.config.validation".to_string(), + "true".to_string(), + )]), + ) + .await + .unwrap(); let configs = table.configs; assert!(panic::catch_unwind(|| configs.get_or_default(BaseFileFormat)).is_err()); assert!(panic::catch_unwind(|| configs.get_or_default(Checksum)).is_err()); @@ -431,7 +515,15 @@ mod tests { let base_url = Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap()) .unwrap(); - let table = Table::new(base_url.as_str(), HashMap::new()).await.unwrap(); + let table = Table::new( + base_url.as_str(), + HashMap::from_iter(vec![( + "hoodie_internal.skip.config.validation".to_string(), + "true".to_string(), + )]), + ) + .await + .unwrap(); let configs = table.configs; assert_eq!( configs.get(BaseFileFormat).unwrap().to::<String>(), diff --git a/crates/core/src/table/timeline.rs b/crates/core/src/table/timeline.rs index d2d476d..9010738 100644 --- a/crates/core/src/table/timeline.rs +++ b/crates/core/src/table/timeline.rs @@ -88,7 +88,7 @@ impl Timeline { storage_options: Arc<HashMap<String, String>>, configs: Arc<HudiConfigs>, ) -> Result<Self> { - let storage = Storage::new(base_url, storage_options)?; + let storage = Storage::new(base_url, &storage_options)?; let instants = Self::load_completed_commit_instants(&storage).await?; Ok(Self { storage,