This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new f1c2818  feat: add config validation when creating table (#49)
f1c2818 is described below

commit f1c2818d212e3bca37c08c883508953875535718
Author: Shiyan Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Fri Jul 5 23:54:56 2024 -0500

    feat: add config validation when creating table (#49)
    
    Guard table create by validating all recognizable hudi configs.
    
    Fixes #40
---
 crates/core/Cargo.toml            |   1 +
 crates/core/src/config/read.rs    |   6 +-
 crates/core/src/config/table.rs   |   9 +--
 crates/core/src/storage/mod.rs    |  25 ++++----
 crates/core/src/table/fs_view.rs  |   6 +-
 crates/core/src/table/mod.rs      | 128 ++++++++++++++++++++++++++++++++------
 crates/core/src/table/timeline.rs |   2 +-
 7 files changed, 132 insertions(+), 45 deletions(-)

diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index 06e15a4..5e94cf3 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -61,6 +61,7 @@ bytes = { workspace = true }
 chrono = { workspace = true, default-features = false, features = ["clock"] }
 hashbrown = "0.14.3"
 regex = { workspace = true }
+strum = { workspace = true }
 strum_macros = { workspace = true }
 uuid = { workspace = true, features = ["serde", "v4"] }
 url = { workspace = true }
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index 43879de..2af57e8 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -20,11 +20,11 @@
 use std::collections::HashMap;
 use std::str::FromStr;
 
-use anyhow::{anyhow, Result};
-
 use crate::config::{ConfigParser, HudiConfigValue};
+use anyhow::{anyhow, Result};
+use strum_macros::EnumIter;
 
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiReadConfig {
     InputPartitions,
 }
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index d503e32..a23ecbd 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -22,11 +22,11 @@ use std::str::FromStr;
 
 use anyhow::anyhow;
 use anyhow::Result;
-use strum_macros::AsRefStr;
+use strum_macros::{AsRefStr, EnumIter};
 
 use crate::config::{ConfigParser, HudiConfigValue};
 
-#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiTableConfig {
     BaseFileFormat,
     Checksum,
@@ -80,10 +80,7 @@ impl ConfigParser for HudiTableConfig {
     }
 
     fn is_required(&self) -> bool {
-        matches!(
-            self,
-            Self::BaseFileFormat | Self::TableName | Self::TableType | 
Self::TableVersion
-        )
+        matches!(self, Self::TableName | Self::TableType | Self::TableVersion)
     }
 
     fn parse_value(&self, configs: &HashMap<String, String>) -> 
Result<Self::Output> {
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 42cb147..374f334 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -41,21 +41,18 @@ pub mod file_info;
 pub mod file_stats;
 pub mod utils;
 
-#[allow(dead_code)]
 #[derive(Clone, Debug)]
 pub struct Storage {
     base_url: Arc<Url>,
-    options: Arc<HashMap<String, String>>,
     object_store: Arc<dyn ObjectStore>,
 }
 
 impl Storage {
-    pub fn new(base_url: Arc<Url>, options: Arc<HashMap<String, String>>) -> 
Result<Arc<Storage>> {
-        match parse_url_opts(&base_url, &*options) {
-            Ok(object_store) => Ok(Arc::new(Storage {
+    pub fn new(base_url: Arc<Url>, options: &HashMap<String, String>) -> 
Result<Arc<Storage>> {
+        match parse_url_opts(&base_url, options) {
+            Ok((object_store, _)) => Ok(Arc::new(Storage {
                 base_url,
-                options,
-                object_store: Arc::new(object_store.0),
+                object_store: Arc::new(object_store),
             })),
             Err(e) => Err(anyhow!("Failed to create storage: {}", e)),
         }
@@ -216,7 +213,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let first_level_dirs: HashSet<String> =
             storage.list_dirs(None).await.unwrap().into_iter().collect();
         assert_eq!(
@@ -238,7 +235,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let first_level_dirs: HashSet<ObjPath> = storage
             .list_dirs_as_obj_paths(None)
             .await
@@ -261,7 +258,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let file_info_1: Vec<FileInfo> = storage
             .list_files(None)
             .await
@@ -320,7 +317,7 @@ mod tests {
             
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
         )
         .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
         assert_eq!(
             leaf_dirs,
@@ -333,7 +330,7 @@ mod tests {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data/leaf_dir")).unwrap())
                 .unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
         assert_eq!(
             leaf_dirs,
@@ -346,7 +343,7 @@ mod tests {
     async fn storage_get_file_info() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let file_info = storage.get_file_info("a.parquet").await.unwrap();
         assert_eq!(file_info.name, "a.parquet");
         assert_eq!(
@@ -360,7 +357,7 @@ mod tests {
     async fn storage_get_parquet_file_data() {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let file_data = 
storage.get_parquet_file_data("a.parquet").await.unwrap();
         assert_eq!(file_data.num_rows(), 5);
     }
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 02d3ca4..bdc05fc 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -44,7 +44,7 @@ impl FileSystemView {
         storage_options: Arc<HashMap<String, String>>,
         configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
-        let storage = Storage::new(base_url, storage_options)?;
+        let storage = Storage::new(base_url, &storage_options)?;
         let partition_paths = Self::load_partition_paths(&storage).await?;
         let partition_to_file_groups =
             Self::load_file_groups_for_partitions(&storage, 
partition_paths).await?;
@@ -176,7 +176,7 @@ mod tests {
     #[tokio::test]
     async fn get_partition_paths_for_nonpartitioned_table() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let partition_paths = FileSystemView::load_partition_paths(&storage)
             .await
             .unwrap();
@@ -188,7 +188,7 @@ mod tests {
     #[tokio::test]
     async fn get_partition_paths_for_complexkeygen_table() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let storage = Storage::new(Arc::new(base_url), 
Arc::new(HashMap::new())).unwrap();
+        let storage = Storage::new(Arc::new(base_url), 
&HashMap::new()).unwrap();
         let partition_paths = FileSystemView::load_partition_paths(&storage)
             .await
             .unwrap();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e09fd10..1493394 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -19,13 +19,20 @@
 
 use std::collections::HashMap;
 use std::io::{BufRead, BufReader};
+use std::str::FromStr;
 use std::sync::Arc;
 
 use anyhow::{anyhow, Context, Result};
 use arrow::record_batch::RecordBatch;
 use arrow_schema::Schema;
+use strum::IntoEnumIterator;
 use url::Url;
 
+use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
+use TableTypeValue::CopyOnWrite;
+
+use crate::config::read::HudiReadConfig;
+use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
 use crate::file_group::FileSlice;
 use crate::storage::utils::parse_uri;
@@ -39,49 +46,57 @@ mod timeline;
 #[derive(Clone, Debug)]
 pub struct Table {
     pub base_url: Arc<Url>,
-    pub storage_options: Arc<HashMap<String, String>>,
     pub configs: Arc<HudiConfigs>,
+    pub extra_options: Arc<HashMap<String, String>>,
     pub timeline: Timeline,
     pub file_system_view: FileSystemView,
 }
 
 impl Table {
-    pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) 
-> Result<Self> {
+    pub async fn new(base_uri: &str, all_options: HashMap<String, String>) -> 
Result<Self> {
         let base_url = Arc::new(parse_uri(base_uri)?);
-        let storage_options = Arc::new(storage_options);
 
-        let configs = Self::load_properties(base_url.clone(), 
storage_options.clone())
+        let (configs, extra_options) = Self::load_configs(base_url.clone(), 
&all_options)
             .await
             .context("Failed to load table properties")?;
         let configs = Arc::new(configs);
+        let extra_options = Arc::new(extra_options);
 
-        let timeline = Timeline::new(base_url.clone(), 
storage_options.clone(), configs.clone())
+        let timeline = Timeline::new(base_url.clone(), extra_options.clone(), 
configs.clone())
             .await
             .context("Failed to load timeline")?;
 
         let file_system_view =
-            FileSystemView::new(base_url.clone(), storage_options.clone(), 
configs.clone())
+            FileSystemView::new(base_url.clone(), extra_options.clone(), 
configs.clone())
                 .await
                 .context("Failed to load file system view")?;
 
         Ok(Table {
             base_url,
-            storage_options,
             configs,
+            extra_options,
             timeline,
             file_system_view,
         })
     }
 
-    async fn load_properties(
+    async fn load_configs(
         base_url: Arc<Url>,
-        storage_options: Arc<HashMap<String, String>>,
-    ) -> Result<HudiConfigs> {
-        let storage = Storage::new(base_url, storage_options)?;
+        all_options: &HashMap<String, String>,
+    ) -> Result<(HudiConfigs, HashMap<String, String>)> {
+        let mut hudi_options = HashMap::new();
+        let mut extra_options = HashMap::new();
+        for (k, v) in all_options {
+            if k.starts_with("hoodie.") {
+                hudi_options.insert(k.clone(), v.clone());
+            } else {
+                extra_options.insert(k.clone(), v.clone());
+            }
+        }
+        let storage = Storage::new(base_url, &extra_options)?;
         let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
         let cursor = std::io::Cursor::new(data);
         let lines = BufReader::new(cursor).lines();
-        let mut properties: HashMap<String, String> = HashMap::new();
         for line in lines {
             let line = line?;
             let trimmed_line = line.trim();
@@ -91,9 +106,54 @@ impl Table {
             let mut parts = trimmed_line.splitn(2, '=');
             let key = parts.next().unwrap().to_owned();
             let value = parts.next().unwrap_or("").to_owned();
-            properties.insert(key, value);
+            // `hoodie.properties` takes precedence TODO handle conflicts 
where applicable
+            hudi_options.insert(key, value);
+        }
+        let hudi_configs = HudiConfigs::new(hudi_options);
+
+        Self::validate_configs(&hudi_configs, &extra_options).map(|_| 
(hudi_configs, extra_options))
+    }
+
+    fn validate_configs(
+        hudi_configs: &HudiConfigs,
+        extra_options: &HashMap<String, String>,
+    ) -> Result<()> {
+        if extra_options
+            .get("hoodie_internal.skip.config.validation")
+            .and_then(|v| bool::from_str(v).ok())
+            .unwrap_or(false)
+        {
+            return Ok(());
+        }
+
+        for conf in HudiTableConfig::iter() {
+            hudi_configs.validate(conf)?
         }
-        Ok(HudiConfigs::new(properties))
+
+        for conf in HudiReadConfig::iter() {
+            hudi_configs.validate(conf)?
+        }
+
+        // additional validation
+        let table_type = hudi_configs.get(TableType)?.to::<String>();
+        if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
+            return Err(anyhow!("Only support copy-on-write table."));
+        }
+
+        let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
+        if !(5..=6).contains(&table_version) {
+            return Err(anyhow!("Only support table version 5 and 6."));
+        }
+
+        let drops_partition_cols = 
hudi_configs.get(DropsPartitionFields)?.to::<bool>();
+        if drops_partition_cols {
+            return Err(anyhow!(
+                "Only support when `{}` is disabled",
+                DropsPartitionFields.as_ref()
+            ));
+        }
+
+        Ok(())
     }
 
     pub async fn get_schema(&self) -> Result<Schema> {
@@ -326,7 +386,15 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
                 .unwrap();
-        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let table = Table::new(
+            base_url.as_str(),
+            HashMap::from_iter(vec![(
+                "hoodie_internal.skip.config.validation".to_string(),
+                "true".to_string(),
+            )]),
+        )
+        .await
+        .unwrap();
         let configs = table.configs;
         assert!(
             configs.validate(BaseFileFormat).is_err(),
@@ -377,7 +445,15 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
                 .unwrap();
-        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let table = Table::new(
+            base_url.as_str(),
+            HashMap::from_iter(vec![(
+                "hoodie_internal.skip.config.validation".to_string(),
+                "true".to_string(),
+            )]),
+        )
+        .await
+        .unwrap();
         let configs = table.configs;
         assert!(configs.get(BaseFileFormat).is_err());
         assert!(configs.get(Checksum).is_err());
@@ -401,7 +477,15 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
                 .unwrap();
-        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let table = Table::new(
+            base_url.as_str(),
+            HashMap::from_iter(vec![(
+                "hoodie_internal.skip.config.validation".to_string(),
+                "true".to_string(),
+            )]),
+        )
+        .await
+        .unwrap();
         let configs = table.configs;
         assert!(panic::catch_unwind(|| 
configs.get_or_default(BaseFileFormat)).is_err());
         assert!(panic::catch_unwind(|| 
configs.get_or_default(Checksum)).is_err());
@@ -431,7 +515,15 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
                 .unwrap();
-        let table = Table::new(base_url.as_str(), 
HashMap::new()).await.unwrap();
+        let table = Table::new(
+            base_url.as_str(),
+            HashMap::from_iter(vec![(
+                "hoodie_internal.skip.config.validation".to_string(),
+                "true".to_string(),
+            )]),
+        )
+        .await
+        .unwrap();
         let configs = table.configs;
         assert_eq!(
             configs.get(BaseFileFormat).unwrap().to::<String>(),
diff --git a/crates/core/src/table/timeline.rs 
b/crates/core/src/table/timeline.rs
index d2d476d..9010738 100644
--- a/crates/core/src/table/timeline.rs
+++ b/crates/core/src/table/timeline.rs
@@ -88,7 +88,7 @@ impl Timeline {
         storage_options: Arc<HashMap<String, String>>,
         configs: Arc<HudiConfigs>,
     ) -> Result<Self> {
-        let storage = Storage::new(base_url, storage_options)?;
+        let storage = Storage::new(base_url, &storage_options)?;
         let instants = Self::load_completed_commit_instants(&storage).await?;
         Ok(Self {
             storage,

Reply via email to