This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5dc57643a feat(sqllogictest): use serde derived structs for schedule
parsing (#1953)
5dc57643a is described below
commit 5dc57643aebbe1be3ac36e29581fa7462bfdd2cd
Author: Andrea Bozzo <[email protected]>
AuthorDate: Thu Dec 25 01:45:35 2025 +0100
feat(sqllogictest): use serde derived structs for schedule parsing (#1953)
This PR refactors the schedule file parsing in the sqllogictest crate to
use serde-derived structs instead of manual TOML parsing, as requested
in #1952.
### Changes
- **New structs with serde derives:**
- `ScheduleConfig` - top-level configuration parsed from TOML
- `EngineConfig` - per-engine configuration with `#[serde(flatten)]` for
extensibility
- `EngineType` - enum of supported engine types
- **Refactored parsing flow:**
- `Schedule::from_file()` now uses `toml::from_str()` directly
- Added `instantiate_engines()` to separate parsing from engine creation
- Removed manual `parse_engines()` and `parse_steps()` functions
- **Forward-compatibility:**
- Uses `#[serde(flatten)]` to capture extra fields in
`EngineConfig.extra`
- This enables PR #1943 to easily add `catalog_type` and
`catalog_properties` support
### Relation to #1943
This PR was suggested by @liurenjie1024 as a prerequisite to #1943
(dynamic catalog configuration). The `#[serde(flatten)]` approach allows
#1943 to simply extract the catalog configuration from
`EngineConfig.extra` without modifying the parsing logic.
### Testing
- All existing tests pass
- Added new unit tests for deserialization behavior
- Integration test with `df_test.toml` passes unchanged
Closes #1952
---
crates/sqllogictest/src/engine/datafusion.rs | 18 ++-
crates/sqllogictest/src/engine/mod.rs | 96 +++++++++----
crates/sqllogictest/src/schedule.rs | 198 +++++++++++++++++----------
3 files changed, 207 insertions(+), 105 deletions(-)
diff --git a/crates/sqllogictest/src/engine/datafusion.rs
b/crates/sqllogictest/src/engine/datafusion.rs
index e3402dfa9..e9f93287d 100644
--- a/crates/sqllogictest/src/engine/datafusion.rs
+++ b/crates/sqllogictest/src/engine/datafusion.rs
@@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema,
Transform, Type, Unbound
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation};
use iceberg_datafusion::IcebergCatalogProvider;
use indicatif::ProgressBar;
-use toml::Table as TomlTable;
-use crate::engine::{EngineRunner, run_slt_with_runner};
+use crate::engine::{DatafusionCatalogConfig, EngineRunner,
run_slt_with_runner};
use crate::error::Result;
pub struct DataFusionEngine {
@@ -59,12 +58,15 @@ impl EngineRunner for DataFusionEngine {
}
impl DataFusionEngine {
- pub async fn new(config: TomlTable) -> Result<Self> {
+ pub async fn new(catalog_config: Option<DatafusionCatalogConfig>) ->
Result<Self> {
let session_config = SessionConfig::new()
.with_target_partitions(4)
.with_information_schema(true);
let ctx = SessionContext::new_with_config(session_config);
- ctx.register_catalog("default", Self::create_catalog(&config).await?);
+ ctx.register_catalog(
+ "default",
+ Self::create_catalog(catalog_config.as_ref()).await?,
+ );
Ok(Self {
test_data_path: PathBuf::from("testdata"),
@@ -72,9 +74,11 @@ impl DataFusionEngine {
})
}
- async fn create_catalog(_: &TomlTable) -> anyhow::Result<Arc<dyn
CatalogProvider>> {
- // TODO: support dynamic catalog configuration
- // See: https://github.com/apache/iceberg-rust/issues/1780
+ async fn create_catalog(
+ _catalog_config: Option<&DatafusionCatalogConfig>,
+ ) -> anyhow::Result<Arc<dyn CatalogProvider>> {
+ // TODO: Use catalog_config to load different catalog types via
iceberg-catalog-loader
+ // See: https://github.com/apache/iceberg-rust/issues/1780
let catalog = MemoryCatalogBuilder::default()
.load(
"memory",
diff --git a/crates/sqllogictest/src/engine/mod.rs
b/crates/sqllogictest/src/engine/mod.rs
index 724359fbe..a27667140 100644
--- a/crates/sqllogictest/src/engine/mod.rs
+++ b/crates/sqllogictest/src/engine/mod.rs
@@ -17,29 +17,45 @@
mod datafusion;
+use std::collections::HashMap;
use std::path::Path;
use anyhow::anyhow;
+use serde::Deserialize;
use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file};
-use toml::Table as TomlTable;
use crate::engine::datafusion::DataFusionEngine;
use crate::error::{Error, Result};
-const TYPE_DATAFUSION: &str = "datafusion";
+/// Configuration for the catalog used by the DataFusion engine
+#[derive(Debug, Clone, Deserialize)]
+pub struct DatafusionCatalogConfig {
+ /// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql"
+ #[serde(rename = "type")]
+ pub catalog_type: String,
+ /// Catalog properties passed to the catalog loader
+ #[serde(default)]
+ pub props: HashMap<String, String>,
+}
+
+/// Engine configuration as a tagged enum
+#[derive(Debug, Clone, Deserialize)]
+#[serde(tag = "type", rename_all = "lowercase")]
+pub enum EngineConfig {
+ Datafusion {
+ #[serde(default)]
+ catalog: Option<DatafusionCatalogConfig>,
+ },
+}
#[async_trait::async_trait]
pub trait EngineRunner: Send {
async fn run_slt_file(&mut self, path: &Path) -> Result<()>;
}
-pub async fn load_engine_runner(
- engine_type: &str,
- cfg: TomlTable,
-) -> Result<Box<dyn EngineRunner>> {
- match engine_type {
- TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)),
- _ => Err(anyhow::anyhow!("Unsupported engine type:
{engine_type}").into()),
+pub async fn load_engine_runner(config: EngineConfig) -> Result<Box<dyn
EngineRunner>> {
+ match config {
+ EngineConfig::Datafusion { catalog } =>
Ok(Box::new(DataFusionEngine::new(catalog).await?)),
}
}
@@ -65,29 +81,63 @@ where
#[cfg(test)]
mod tests {
- use crate::engine::{TYPE_DATAFUSION, load_engine_runner};
+ use crate::engine::{DatafusionCatalogConfig, EngineConfig,
load_engine_runner};
- #[tokio::test]
- async fn test_engine_invalid_type() {
+ #[test]
+ fn test_deserialize_engine_config() {
+ let input = r#"type = "datafusion""#;
+
+ let config: EngineConfig = toml::from_str(input).unwrap();
+ assert!(matches!(config, EngineConfig::Datafusion { catalog: None }));
+ }
+
+ #[test]
+ fn test_deserialize_engine_config_with_catalog() {
+ let input = r#"
+ type = "datafusion"
+
+ [catalog]
+ type = "rest"
+
+ [catalog.props]
+ uri = "http://localhost:8181"
+ "#;
+
+ let config: EngineConfig = toml::from_str(input).unwrap();
+ match config {
+ EngineConfig::Datafusion { catalog: Some(cat) } => {
+ assert_eq!(cat.catalog_type, "rest");
+ assert_eq!(
+ cat.props.get("uri"),
+ Some(&"http://localhost:8181".to_string())
+ );
+ }
+ _ => panic!("Expected Datafusion with catalog"),
+ }
+ }
+
+ #[test]
+ fn test_deserialize_catalog_config() {
let input = r#"
- [engines]
- random = { type = "random_engine", url = "http://localhost:8181" }
+ type = "memory"
+
+ [props]
+ warehouse = "file:///tmp/warehouse"
"#;
- let tbl = toml::from_str(input).unwrap();
- let result = load_engine_runner("random_engine", tbl).await;
- assert!(result.is_err());
+ let config: DatafusionCatalogConfig = toml::from_str(input).unwrap();
+ assert_eq!(config.catalog_type, "memory");
+ assert_eq!(
+ config.props.get("warehouse"),
+ Some(&"file:///tmp/warehouse".to_string())
+ );
}
#[tokio::test]
async fn test_load_datafusion() {
- let input = r#"
- [engines]
- df = { type = "datafusion" }
- "#;
- let tbl = toml::from_str(input).unwrap();
- let result = load_engine_runner(TYPE_DATAFUSION, tbl).await;
+ let config = EngineConfig::Datafusion { catalog: None };
+ let result = load_engine_runner(config).await;
assert!(result.is_ok());
}
}
diff --git a/crates/sqllogictest/src/schedule.rs
b/crates/sqllogictest/src/schedule.rs
index 7c13ad4d1..25728a296 100644
--- a/crates/sqllogictest/src/schedule.rs
+++ b/crates/sqllogictest/src/schedule.rs
@@ -21,10 +21,18 @@ use std::path::{Path, PathBuf};
use anyhow::{Context, anyhow};
use serde::{Deserialize, Serialize};
-use toml::{Table as TomlTable, Value};
use tracing::info;
-use crate::engine::{EngineRunner, load_engine_runner};
+use crate::engine::{EngineConfig, EngineRunner, load_engine_runner};
+
+/// Raw configuration parsed from the schedule TOML file
+#[derive(Debug, Clone, Deserialize)]
+pub struct ScheduleConfig {
+ /// Engine name to engine configuration
+ pub engines: HashMap<String, EngineConfig>,
+ /// List of test steps to run
+ pub steps: Vec<Step>,
+}
pub struct Schedule {
/// Engine names to engine instances
@@ -59,15 +67,27 @@ impl Schedule {
pub async fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
let path_str = path.as_ref().to_string_lossy().to_string();
let content = read_to_string(path)?;
- let toml_value = content.parse::<Value>()?;
- let toml_table = toml_value
- .as_table()
- .ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?;
- let engines = Schedule::parse_engines(toml_table).await?;
- let steps = Schedule::parse_steps(toml_table)?;
+ let config: ScheduleConfig = toml::from_str(&content)
+ .with_context(|| format!("Failed to parse schedule file:
{path_str}"))?;
- Ok(Self::new(engines, steps, path_str))
+ let engines = Self::instantiate_engines(config.engines).await?;
+
+ Ok(Self::new(engines, config.steps, path_str))
+ }
+
+ /// Instantiate engine runners from their configurations
+ async fn instantiate_engines(
+ configs: HashMap<String, EngineConfig>,
+ ) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
+ let mut engines = HashMap::new();
+
+ for (name, config) in configs {
+ let engine = load_engine_runner(config).await?;
+ engines.insert(name, engine);
+ }
+
+ Ok(engines)
}
pub async fn run(mut self) -> anyhow::Result<()> {
@@ -105,103 +125,131 @@ impl Schedule {
}
Ok(())
}
+}
- async fn parse_engines(
- table: &TomlTable,
- ) -> anyhow::Result<HashMap<String, Box<dyn EngineRunner>>> {
- let engines_tbl = table
- .get("engines")
- .with_context(|| "Schedule file must have an 'engines' table")?
- .as_table()
- .ok_or_else(|| anyhow!("'engines' must be a table"))?;
-
- let mut engines = HashMap::new();
-
- for (name, engine_val) in engines_tbl {
- let cfg_tbl = engine_val
- .as_table()
- .ok_or_else(|| anyhow!("Config of engine '{name}' is not a
table"))?
- .clone();
-
- let engine_type = cfg_tbl
- .get("type")
- .ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a
'type' field"))?
- .as_str()
- .ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a
string"))?;
-
- let engine = load_engine_runner(engine_type,
cfg_tbl.clone()).await?;
-
- if engines.insert(name.clone(), engine).is_some() {
- return Err(anyhow!("Duplicate engine '{name}'"));
- }
- }
+#[cfg(test)]
+mod tests {
+ use crate::engine::EngineConfig;
+ use crate::schedule::ScheduleConfig;
- Ok(engines)
- }
+ #[test]
+ fn test_deserialize_schedule_config() {
+ let input = r#"
+ [engines]
+ df = { type = "datafusion" }
- fn parse_steps(table: &TomlTable) -> anyhow::Result<Vec<Step>> {
- let steps_val = table
- .get("steps")
- .with_context(|| "Schedule file must have a 'steps' array")?;
+ [[steps]]
+ engine = "df"
+ slt = "test.slt"
+ "#;
- let steps: Vec<Step> = steps_val
- .clone()
- .try_into()
- .with_context(|| "Failed to deserialize steps")?;
+ let config: ScheduleConfig = toml::from_str(input).unwrap();
- Ok(steps)
+ assert_eq!(config.engines.len(), 1);
+ assert!(config.engines.contains_key("df"));
+ assert!(matches!(config.engines["df"], EngineConfig::Datafusion {
+ catalog: None
+ }));
+ assert_eq!(config.steps.len(), 1);
+ assert_eq!(config.steps[0].engine, "df");
+ assert_eq!(config.steps[0].slt, "test.slt");
}
-}
-
-#[cfg(test)]
-mod tests {
- use toml::Table as TomlTable;
-
- use crate::schedule::Schedule;
#[test]
- fn test_parse_steps() {
+ fn test_deserialize_multiple_steps() {
let input = r#"
+ [engines]
+ datafusion = { type = "datafusion" }
+
[[steps]]
engine = "datafusion"
slt = "test.slt"
[[steps]]
- engine = "spark"
+ engine = "datafusion"
slt = "test2.slt"
"#;
- let tbl: TomlTable = toml::from_str(input).unwrap();
- let steps = Schedule::parse_steps(&tbl).unwrap();
+ let config: ScheduleConfig = toml::from_str(input).unwrap();
- assert_eq!(steps.len(), 2);
- assert_eq!(steps[0].engine, "datafusion");
- assert_eq!(steps[0].slt, "test.slt");
- assert_eq!(steps[1].engine, "spark");
- assert_eq!(steps[1].slt, "test2.slt");
+ assert_eq!(config.steps.len(), 2);
+ assert_eq!(config.steps[0].engine, "datafusion");
+ assert_eq!(config.steps[0].slt, "test.slt");
+ assert_eq!(config.steps[1].engine, "datafusion");
+ assert_eq!(config.steps[1].slt, "test2.slt");
}
#[test]
- fn test_parse_steps_empty() {
+ fn test_deserialize_with_catalog_config() {
let input = r#"
+ [engines.df]
+ type = "datafusion"
+
+ [engines.df.catalog]
+ type = "rest"
+
+ [engines.df.catalog.props]
+ uri = "http://localhost:8181"
+
[[steps]]
+ engine = "df"
+ slt = "test.slt"
"#;
- let tbl: TomlTable = toml::from_str(input).unwrap();
- let steps = Schedule::parse_steps(&tbl);
+ let config: ScheduleConfig = toml::from_str(input).unwrap();
- assert!(steps.is_err());
+ match &config.engines["df"] {
+ EngineConfig::Datafusion { catalog: Some(cat) } => {
+ assert_eq!(cat.catalog_type, "rest");
+ assert_eq!(
+ cat.props.get("uri"),
+ Some(&"http://localhost:8181".to_string())
+ );
+ }
+ _ => panic!("Expected Datafusion with catalog config"),
+ }
}
- #[tokio::test]
- async fn test_parse_engines_invalid_table() {
- let toml_content = r#"
- engines = "not_a_table"
+ #[test]
+ fn test_deserialize_missing_engine_type() {
+ let input = r#"
+ [engines]
+ df = { }
+
+ [[steps]]
+ engine = "df"
+ slt = "test.slt"
"#;
- let table: TomlTable = toml::from_str(toml_content).unwrap();
- let result = Schedule::parse_engines(&table).await;
+ let result: Result<ScheduleConfig, _> = toml::from_str(input);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_deserialize_invalid_engine_type() {
+ let input = r#"
+ [engines]
+ df = { type = "unknown_engine" }
+
+ [[steps]]
+ engine = "df"
+ slt = "test.slt"
+ "#;
+
+ let result: Result<ScheduleConfig, _> = toml::from_str(input);
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn test_deserialize_missing_step_fields() {
+ let input = r#"
+ [engines]
+ df = { type = "datafusion" }
+
+ [[steps]]
+ "#;
+ let result: Result<ScheduleConfig, _> = toml::from_str(input);
assert!(result.is_err());
}
}