This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 8b8451a refactor: replace custom `.to::()` with `into()` for
`HudiConfig` conversions (#432)
8b8451a is described below
commit 8b8451a57aff308a071bd06e05dc77377c879f77
Author: Yunchi Pang <[email protected]>
AuthorDate: Fri Oct 3 10:36:13 2025 -0700
refactor: replace custom `.to::()` with `into()` for `HudiConfig`
conversions (#432)
---
crates/core/src/config/mod.rs | 11 ---
crates/core/src/config/read.rs | 42 +++-----
crates/core/src/config/table.rs | 27 ++---
crates/core/src/file_group/log_file/reader.rs | 4 +-
crates/core/src/file_group/reader.rs | 16 +--
crates/core/src/file_group/record_batches.rs | 4 +-
crates/core/src/merge/ordering.rs | 4 +-
crates/core/src/merge/record_merger.rs | 17 +---
crates/core/src/schema/resolver.rs | 4 +-
crates/core/src/table/listing.rs | 10 +-
crates/core/src/table/mod.rs | 136 ++++++++++++--------------
crates/core/src/table/partition.rs | 15 +--
crates/core/src/table/validation.rs | 13 +--
crates/core/src/timeline/selector.rs | 2 +-
crates/datafusion/src/lib.rs | 2 +-
15 files changed, 125 insertions(+), 182 deletions(-)
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 06dbabf..6e9502a 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -92,17 +92,6 @@ pub enum HudiConfigValue {
}
impl HudiConfigValue {
- /// Covert [HudiConfigValue] logical type to the representing data type in
Rust.
- ///
- /// - [`HudiConfigValue::Boolean`] -> [bool]
- /// - [`HudiConfigValue::Integer`] -> [isize]
- /// - [`HudiConfigValue::UInteger`] -> [usize]
- /// - [`HudiConfigValue::String`] -> [String]
- /// - [`HudiConfigValue::List`] -> [`Vec<String>`]
- pub fn to<T: 'static + std::fmt::Debug + From<HudiConfigValue>>(self) -> T
{
- T::from(self)
- }
-
/// A convenience method to convert [HudiConfigValue] to [Url] when the
value is a [String] and is intended to be a URL.
/// Panic if the value is not a [String].
pub fn to_url(self) -> StorageResult<Url> {
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index f550724..dae0a44 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -141,21 +141,12 @@ mod tests {
"true".to_string(),
),
]);
- assert_eq!(
- InputPartitions.parse_value(&options).unwrap().to::<usize>(),
- 100
- );
- assert_eq!(
- ListingParallelism
- .parse_value(&options)
- .unwrap()
- .to::<usize>(),
- 100
- );
- assert!(UseReadOptimizedMode
- .parse_value(&options)
- .unwrap()
- .to::<bool>());
+ let actual: usize =
InputPartitions.parse_value(&options).unwrap().into();
+ assert_eq!(actual, 100);
+ let actual: usize =
ListingParallelism.parse_value(&options).unwrap().into();
+ assert_eq!(actual, 100);
+ let actual: bool =
UseReadOptimizedMode.parse_value(&options).unwrap().into();
+ assert!(actual);
}
#[test]
@@ -169,28 +160,19 @@ mod tests {
InputPartitions.parse_value(&options).unwrap_err(),
ParseInt(_, _, _)
));
- assert_eq!(
- InputPartitions
- .parse_value_or_default(&options)
- .to::<usize>(),
- 0
- );
+ let actual: usize =
InputPartitions.parse_value_or_default(&options).into();
+ assert_eq!(actual, 0);
assert!(matches!(
ListingParallelism.parse_value(&options).unwrap_err(),
ParseInt(_, _, _)
));
- assert_eq!(
- ListingParallelism
- .parse_value_or_default(&options)
- .to::<usize>(),
- 10
- );
+ let actual: usize =
ListingParallelism.parse_value_or_default(&options).into();
+ assert_eq!(actual, 10);
assert!(matches!(
UseReadOptimizedMode.parse_value(&options).unwrap_err(),
ParseBool(_, _, _)
));
- assert!(!UseReadOptimizedMode
- .parse_value_or_default(&options)
- .to::<bool>())
+ let actual: bool =
UseReadOptimizedMode.parse_value_or_default(&options).into();
+ assert!(!actual)
}
}
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index 5264236..06f3dec 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -245,9 +245,9 @@ impl ConfigParser for HudiTableConfig {
self.parse_value(configs).unwrap_or_else(|_| {
match self {
Self::RecordMergeStrategy => {
- let populates_meta_fields =
HudiTableConfig::PopulatesMetaFields
+ let populates_meta_fields: bool =
HudiTableConfig::PopulatesMetaFields
.parse_value_or_default(configs)
- .to::<bool>();
+ .into();
if !populates_meta_fields {
// When populatesMetaFields is false, meta fields such
as record key and
// partition path are null, the table is supposed to
be append-only.
@@ -483,19 +483,21 @@ mod tests {
(HudiTableConfig::PopulatesMetaFields, "false"),
(HudiTableConfig::PrecombineField, "ts"),
]);
+ let actual: String = hudi_configs
+ .get_or_default(HudiTableConfig::RecordMergeStrategy)
+ .into();
assert_eq!(
- hudi_configs
- .get_or_default(HudiTableConfig::RecordMergeStrategy)
- .to::<String>(),
+ actual,
RecordMergeStrategyValue::AppendOnly.as_ref(),
"Should derive as append-only due to populatesMetaFields=false"
);
let hudi_configs =
HudiConfigs::new(vec![(HudiTableConfig::PopulatesMetaFields, "true")]);
+ let actual: String = hudi_configs
+ .get_or_default(HudiTableConfig::RecordMergeStrategy)
+ .into();
assert_eq!(
- hudi_configs
- .get_or_default(HudiTableConfig::RecordMergeStrategy)
- .to::<String>(),
+ actual,
RecordMergeStrategyValue::AppendOnly.as_ref(),
"Should derive as append-only due to missing precombine field"
);
@@ -504,11 +506,12 @@ mod tests {
(HudiTableConfig::PopulatesMetaFields, "true"),
(HudiTableConfig::PrecombineField, "ts"),
]);
+ let actual: String = hudi_configs
+ .get_or_default(HudiTableConfig::RecordMergeStrategy)
+ .into();
assert_eq!(
- hudi_configs
- .get_or_default(HudiTableConfig::RecordMergeStrategy)
- .to::<String>(),
- RecordMergeStrategyValue::OverwriteWithLatest.as_ref(),
+ actual,
+ RecordMergeStrategyValue::OverwriteWithLatest.as_ref()
);
}
}
diff --git a/crates/core/src/file_group/log_file/reader.rs
b/crates/core/src/file_group/log_file/reader.rs
index e0ad3ca..424994a 100644
--- a/crates/core/src/file_group/log_file/reader.rs
+++ b/crates/core/src/file_group/log_file/reader.rs
@@ -55,9 +55,9 @@ impl LogFileReader<StorageReader> {
relative_path: &str,
) -> Result<Self> {
let reader = storage.get_storage_reader(relative_path).await?;
- let timezone = hudi_configs
+ let timezone: String = hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
- .to::<String>();
+ .into();
Ok(Self {
hudi_configs,
storage,
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index b562338..53abbfb 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -107,10 +107,10 @@ impl FileGroupReader {
&self,
records: &RecordBatch,
) -> Result<Option<BooleanArray>> {
- let populates_meta_fields = self
+ let populates_meta_fields: bool = self
.hudi_configs
.get_or_default(HudiTableConfig::PopulatesMetaFields)
- .to::<bool>();
+ .into();
if !populates_meta_fields {
// If meta fields are not populated, commit time filtering is not
applicable.
return Ok(None);
@@ -121,7 +121,7 @@ impl FileGroupReader {
if let Some(start) = self
.hudi_configs
.try_get(HudiReadConfig::FileGroupStartTimestamp)
- .map(|v| v.to::<String>())
+ .map(|v| -> String { v.into() })
{
let filter: Filter =
Filter::try_from((MetaField::CommitTime.as_ref(), ">",
start.as_str()))?;
@@ -137,7 +137,7 @@ impl FileGroupReader {
if let Some(end) = self
.hudi_configs
.try_get(HudiReadConfig::FileGroupEndTimestamp)
- .map(|v| v.to::<String>())
+ .map(|v| -> String { v.into() })
{
let filter = Filter::try_from((MetaField::CommitTime.as_ref(),
"<=", end.as_str()))?;
let filter = SchemableFilter::try_from((filter, schema.as_ref()))?;
@@ -201,15 +201,15 @@ impl FileGroupReader {
let timezone = self
.hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
- .to::<String>();
+ .into();
let start_timestamp = self
.hudi_configs
.try_get(HudiReadConfig::FileGroupStartTimestamp)
- .map(|v| v.to::<String>());
+ .map(|v| -> String { v.into() });
let end_timestamp = self
.hudi_configs
.try_get(HudiReadConfig::FileGroupEndTimestamp)
- .map(|v| v.to::<String>());
+ .map(|v| -> String { v.into() });
InstantRange::new(timezone, start_timestamp, end_timestamp, false,
true)
}
@@ -226,7 +226,7 @@ impl FileGroupReader {
|| self
.hudi_configs
.get_or_default(HudiReadConfig::UseReadOptimizedMode)
- .to::<bool>();
+ .into();
if base_file_only {
self.read_file_slice_by_base_file_path(&relative_path).await
} else {
diff --git a/crates/core/src/file_group/record_batches.rs
b/crates/core/src/file_group/record_batches.rs
index c7aa7ff..962977b 100644
--- a/crates/core/src/file_group/record_batches.rs
+++ b/crates/core/src/file_group/record_batches.rs
@@ -115,9 +115,7 @@ impl RecordBatches {
&self,
hudi_configs: Arc<HudiConfigs>,
) -> Result<RecordBatch> {
- let ordering_field = hudi_configs
- .get(HudiTableConfig::PrecombineField)?
- .to::<String>();
+ let ordering_field: String =
hudi_configs.get(HudiTableConfig::PrecombineField)?.into();
if self.num_delete_rows == 0 {
return
Ok(RecordBatch::new_empty(SchemaRef::from(Schema::empty())));
diff --git a/crates/core/src/merge/ordering.rs
b/crates/core/src/merge/ordering.rs
index 3f279eb..971e192 100644
--- a/crates/core/src/merge/ordering.rs
+++ b/crates/core/src/merge/ordering.rs
@@ -62,9 +62,7 @@ pub fn process_batch_for_max_orderings(
return Ok(());
}
- let ordering_field = hudi_configs
- .get(HudiTableConfig::PrecombineField)?
- .to::<String>();
+ let ordering_field: String =
hudi_configs.get(HudiTableConfig::PrecombineField)?.into();
let keys = extract_record_keys(key_converter, batch)?;
let event_times =
diff --git a/crates/core/src/merge/record_merger.rs
b/crates/core/src/merge/record_merger.rs
index 9ebde9d..95d8dcb 100644
--- a/crates/core/src/merge/record_merger.rs
+++ b/crates/core/src/merge/record_merger.rs
@@ -51,14 +51,10 @@ pub struct RecordMerger {
impl RecordMerger {
/// Validates the given [HudiConfigs] against the [RecordMergeStrategy].
pub fn validate_configs(hudi_configs: &HudiConfigs) -> ConfigResult<()> {
- let merge_strategy = hudi_configs
- .get_or_default(RecordMergeStrategy)
- .to::<String>();
+ let merge_strategy: String =
hudi_configs.get_or_default(RecordMergeStrategy).into();
let merge_strategy =
RecordMergeStrategyValue::from_str(&merge_strategy)?;
- let populate_meta_fields = hudi_configs
- .get_or_default(PopulatesMetaFields)
- .to::<bool>();
+ let populate_meta_fields: bool =
hudi_configs.get_or_default(PopulatesMetaFields).into();
if !populate_meta_fields && merge_strategy !=
RecordMergeStrategyValue::AppendOnly {
return Err(ConfigError::InvalidValue(format!(
"When {:?} is false, {:?} must be {:?}.",
@@ -91,10 +87,7 @@ impl RecordMerger {
}
pub fn merge_record_batches(&self, record_batches: RecordBatches) ->
Result<RecordBatch> {
- let merge_strategy = self
- .hudi_configs
- .get_or_default(RecordMergeStrategy)
- .to::<String>();
+ let merge_strategy: String =
self.hudi_configs.get_or_default(RecordMergeStrategy).into();
let merge_strategy =
RecordMergeStrategyValue::from_str(&merge_strategy)?;
match merge_strategy {
RecordMergeStrategyValue::AppendOnly => {
@@ -109,7 +102,7 @@ impl RecordMerger {
// Use sorting fields to get sorted indices of the data batch
(inserts and updates)
let key_array =
data_batch.get_array(MetaField::RecordKey.as_ref())?;
- let ordering_field =
self.hudi_configs.get(PrecombineField)?.to::<String>();
+ let ordering_field: String =
self.hudi_configs.get(PrecombineField)?.into();
let ordering_array = data_batch.get_array(&ordering_field)?;
let commit_seqno_array =
data_batch.get_array(MetaField::CommitSeqno.as_ref())?;
let desc_indices =
@@ -117,7 +110,7 @@ impl RecordMerger {
// Create shared converters for record keys and ordering values
let key_converter =
create_record_key_converter(data_batch.schema())?;
- let ordering_field =
self.hudi_configs.get(PrecombineField)?.to::<String>();
+ let ordering_field: String =
self.hudi_configs.get(PrecombineField)?.into();
let event_time_converter =
create_event_time_ordering_converter(data_batch.schema(),
&ordering_field)?;
let commit_time_converter =
diff --git a/crates/core/src/schema/resolver.rs
b/crates/core/src/schema/resolver.rs
index 7095849..803f8c6 100644
--- a/crates/core/src/schema/resolver.rs
+++ b/crates/core/src/schema/resolver.rs
@@ -43,7 +43,7 @@ pub async fn resolve_schema(table: &Table) -> Result<Schema> {
}
Err(CoreError::TimelineNoCommit) => {
if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
- let avro_schema_str = create_schema.to::<String>();
+ let avro_schema_str: String = create_schema.into();
let arrow_schema =
arrow_schema_from_avro_schema_str(&avro_schema_str)?;
prepend_meta_fields(SchemaRef::new(arrow_schema))
} else {
@@ -72,7 +72,7 @@ pub async fn resolve_avro_schema(table: &Table) ->
Result<String> {
Ok(metadata) => resolve_avro_schema_from_commit_metadata(&metadata),
Err(CoreError::TimelineNoCommit) => {
if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
- let create_schema = create_schema.to::<String>();
+ let create_schema: String = create_schema.into();
Ok(sanitize_avro_schema_str(&create_schema))
} else {
Err(CoreError::SchemaNotFound(
diff --git a/crates/core/src/table/listing.rs b/crates/core/src/table/listing.rs
index 08bcac5..dad1c58 100644
--- a/crates/core/src/table/listing.rs
+++ b/crates/core/src/table/listing.rs
@@ -61,10 +61,7 @@ impl FileLister {
}
async fn list_file_groups_for_partition(&self, partition_path: &str) ->
Result<Vec<FileGroup>> {
- let base_file_format = self
- .hudi_configs
- .get_or_default(BaseFileFormat)
- .to::<String>();
+ let base_file_format: String =
self.hudi_configs.get_or_default(BaseFileFormat).into();
let listed_file_metadata =
self.storage.list_files(Some(partition_path)).await?;
@@ -164,10 +161,7 @@ impl FileLister {
let pruned_partition_paths =
self.list_relevant_partition_paths().await?;
let file_groups_map =
Arc::new(DashMap::with_capacity(pruned_partition_paths.len()));
- let parallelism = self
- .hudi_configs
- .get_or_default(ListingParallelism)
- .to::<usize>();
+ let parallelism =
self.hudi_configs.get_or_default(ListingParallelism).into();
stream::iter(pruned_partition_paths)
.map(|p| async move {
let file_groups =
self.list_file_groups_for_partition(&p).await?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 42e9053..b0d6f4d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -198,7 +198,7 @@ impl Table {
self.hudi_configs
.get(HudiTableConfig::TableName)
.expect(&err_msg)
- .to::<String>()
+ .into()
}
pub fn table_type(&self) -> String {
@@ -206,7 +206,7 @@ impl Table {
self.hudi_configs
.get(HudiTableConfig::TableType)
.expect(&err_msg)
- .to::<String>()
+ .into()
}
pub fn is_mor(&self) -> bool {
@@ -216,7 +216,7 @@ impl Table {
pub fn timezone(&self) -> String {
self.hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
- .to::<String>()
+ .into()
}
/// Get the latest Avro schema string of the table.
@@ -261,12 +261,10 @@ impl Table {
/// Get the latest partition [arrow_schema::Schema] of the table.
pub async fn get_partition_schema(&self) -> Result<Schema> {
- let partition_fields: HashSet<String> = self
- .hudi_configs
- .get_or_default(PartitionFields)
- .to::<Vec<String>>()
- .into_iter()
- .collect();
+ let partition_fields: HashSet<String> = {
+ let fields: Vec<String> =
self.hudi_configs.get_or_default(PartitionFields).into();
+ fields.into_iter().collect()
+ };
let schema = self.get_schema().await?;
let partition_fields: Vec<Arc<Field>> = schema
@@ -1012,77 +1010,67 @@ mod tests {
fn get_default_for_invalid_table_props() {
let table = get_test_table_without_validation("table_props_invalid");
let configs = table.hudi_configs;
- assert_eq!(
- configs.get_or_default(BaseFileFormat).to::<String>(),
- "parquet"
- );
+ let actual: String = configs.get_or_default(BaseFileFormat).into();
+ assert_eq!(actual, "parquet");
assert!(panic::catch_unwind(||
configs.get_or_default(Checksum)).is_err());
- assert_eq!(
- configs.get_or_default(DatabaseName).to::<String>(),
- "default"
- );
- assert!(!configs.get_or_default(DropsPartitionFields).to::<bool>());
+ let actual: String = configs.get_or_default(DatabaseName).into();
+ assert_eq!(actual, "default");
+ let actual: bool = configs.get_or_default(DropsPartitionFields).into();
+ assert!(!actual);
assert!(panic::catch_unwind(||
configs.get_or_default(IsHiveStylePartitioning)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(KeyGeneratorClass)).is_err());
- assert!(configs
- .get_or_default(PartitionFields)
- .to::<Vec<String>>()
- .is_empty());
+ let actual: Vec<String> =
configs.get_or_default(PartitionFields).into();
+ assert!(actual.is_empty());
assert!(panic::catch_unwind(||
configs.get_or_default(PrecombineField)).is_err());
- assert!(configs.get_or_default(PopulatesMetaFields).to::<bool>());
+ let actual: bool = configs.get_or_default(PopulatesMetaFields).into();
+ assert!(actual);
assert!(panic::catch_unwind(||
configs.get_or_default(RecordKeyFields)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(TableName)).is_err());
- assert_eq!(
- configs.get_or_default(TableType).to::<String>(),
- "COPY_ON_WRITE"
- );
+ let actual: String = configs.get_or_default(TableType).into();
+ assert_eq!(actual, "COPY_ON_WRITE");
assert!(panic::catch_unwind(||
configs.get_or_default(TableVersion)).is_err());
assert!(panic::catch_unwind(||
configs.get_or_default(TimelineLayoutVersion)).is_err());
- assert_eq!(
- configs.get_or_default(TimelineTimezone).to::<String>(),
- "utc"
- );
+ let actual: String = configs.get_or_default(TimelineTimezone).into();
+ assert_eq!(actual, "utc");
}
#[test]
fn get_valid_table_props() {
let table = get_test_table_without_validation("table_props_valid");
let configs = table.hudi_configs;
- assert_eq!(
- configs.get(BaseFileFormat).unwrap().to::<String>(),
- "parquet"
- );
- assert_eq!(configs.get(Checksum).unwrap().to::<isize>(), 3761586722);
- assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "db");
- assert!(!configs.get(DropsPartitionFields).unwrap().to::<bool>());
- assert!(!configs.get(IsHiveStylePartitioning).unwrap().to::<bool>());
- assert!(!configs.get(IsPartitionPathUrlencoded).unwrap().to::<bool>());
- assert_eq!(
- configs.get(KeyGeneratorClass).unwrap().to::<String>(),
- "org.apache.hudi.keygen.SimpleKeyGenerator"
- );
- assert_eq!(
- configs.get(PartitionFields).unwrap().to::<Vec<String>>(),
- vec!["city"]
- );
- assert_eq!(configs.get(PrecombineField).unwrap().to::<String>(), "ts");
- assert!(configs.get(PopulatesMetaFields).unwrap().to::<bool>());
- assert_eq!(
- configs.get(RecordKeyFields).unwrap().to::<Vec<String>>(),
- vec!["uuid"]
- );
- assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
- assert_eq!(
- configs.get(TableType).unwrap().to::<String>(),
- "COPY_ON_WRITE"
- );
- assert_eq!(configs.get(TableVersion).unwrap().to::<isize>(), 6);
- assert_eq!(configs.get(TimelineLayoutVersion).unwrap().to::<isize>(),
1);
- assert_eq!(
- configs.get(TimelineTimezone).unwrap().to::<String>(),
- "local"
- );
+ let actual: String = configs.get(BaseFileFormat).unwrap().into();
+ assert_eq!(actual, "parquet");
+ let actual: isize = configs.get(Checksum).unwrap().into();
+ assert_eq!(actual, 3761586722);
+ let actual: String = configs.get(DatabaseName).unwrap().into();
+ assert_eq!(actual, "db");
+ let actual: bool = configs.get(DropsPartitionFields).unwrap().into();
+ assert!(!actual);
+ let actual: bool =
configs.get(IsHiveStylePartitioning).unwrap().into();
+ assert!(!actual);
+ let actual: bool =
configs.get(IsPartitionPathUrlencoded).unwrap().into();
+ assert!(!actual);
+ let actual: String = configs.get(KeyGeneratorClass).unwrap().into();
+ assert_eq!(actual, "org.apache.hudi.keygen.SimpleKeyGenerator");
+ let actual: Vec<String> = configs.get(PartitionFields).unwrap().into();
+ assert_eq!(actual, vec!["city"]);
+ let actual: String = configs.get(PrecombineField).unwrap().into();
+ assert_eq!(actual, "ts");
+ let actual: bool = configs.get(PopulatesMetaFields).unwrap().into();
+ assert!(actual);
+ let actual: Vec<String> = configs.get(RecordKeyFields).unwrap().into();
+ assert_eq!(actual, vec!["uuid"]);
+ let actual: String = configs.get(TableName).unwrap().into();
+ assert_eq!(actual, "trips");
+ let actual: String = configs.get(TableType).unwrap().into();
+ assert_eq!(actual, "COPY_ON_WRITE");
+ let actual: isize = configs.get(TableVersion).unwrap().into();
+ assert_eq!(actual, 6);
+ let actual: isize = configs.get(TimelineLayoutVersion).unwrap().into();
+ assert_eq!(actual, 1);
+ let actual: String = configs.get(TimelineTimezone).unwrap().into();
+ assert_eq!(actual, "local");
}
#[test]
@@ -1092,7 +1080,8 @@ mod tests {
let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
- assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+ let actual: String = configs.get(TableName).unwrap().into();
+ assert_eq!(actual, "trips");
// Environment variable HUDI_CONF_DIR points to nothing
let base_path = env::current_dir().unwrap();
@@ -1102,7 +1091,8 @@ mod tests {
let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
- assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+ let actual: String = configs.get(TableName).unwrap().into();
+ assert_eq!(actual, "trips");
// With global config
let base_path = env::current_dir().unwrap();
@@ -1110,12 +1100,12 @@ mod tests {
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
let table = get_test_table_without_validation("table_props_partial");
let configs = table.hudi_configs;
- assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
- assert_eq!(
- configs.get(TableType).unwrap().to::<String>(),
- "MERGE_ON_READ"
- );
- assert_eq!(configs.get(TableName).unwrap().to::<String>(), "trips");
+ let actual: String = configs.get(DatabaseName).unwrap().into();
+ assert_eq!(actual, "tmpdb");
+ let actual: String = configs.get(TableType).unwrap().into();
+ assert_eq!(actual, "MERGE_ON_READ");
+ let actual: String = configs.get(TableName).unwrap().into();
+ assert_eq!(actual, "trips");
env::remove_var(HUDI_CONF_DIR)
}
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 0378955..46efacd 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -33,15 +33,16 @@ pub const PARTITION_METAFIELD_PREFIX: &str =
".hoodie_partition_metadata";
pub const EMPTY_PARTITION_PATH: &str = "";
pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> bool {
- let has_partition_fields = !hudi_configs
- .get_or_default(PartitionFields)
- .to::<Vec<String>>()
- .is_empty();
+ let has_partition_fields = {
+ let partition_fields: Vec<String> =
hudi_configs.get_or_default(PartitionFields).into();
+ !partition_fields.is_empty()
+ };
let uses_non_partitioned_key_gen = hudi_configs
.try_get(KeyGeneratorClass)
.map(|key_gen| {
- key_gen.to::<String>() ==
"org.apache.hudi.keygen.NonpartitionedKeyGenerator"
+ let key_gen_str: String = key_gen.into();
+ key_gen_str == "org.apache.hudi.keygen.NonpartitionedKeyGenerator"
})
.unwrap_or(false);
@@ -71,10 +72,10 @@ impl PartitionPruner {
let schema = Arc::new(partition_schema.clone());
let is_hive_style: bool = hudi_configs
.get_or_default(HudiTableConfig::IsHiveStylePartitioning)
- .to();
+ .into();
let is_url_encoded: bool = hudi_configs
.get_or_default(HudiTableConfig::IsPartitionPathUrlencoded)
- .to();
+ .into();
Ok(PartitionPruner {
schema,
is_hive_style,
diff --git a/crates/core/src/table/validation.rs
b/crates/core/src/table/validation.rs
index a2564f5..2829571 100644
--- a/crates/core/src/table/validation.rs
+++ b/crates/core/src/table/validation.rs
@@ -28,10 +28,7 @@ use crate::merge::record_merger::RecordMerger;
use strum::IntoEnumIterator;
pub fn validate_configs(hudi_configs: &HudiConfigs) ->
crate::error::Result<()> {
- if hudi_configs
- .get_or_default(SkipConfigValidation)
- .to::<bool>()
- {
+ if hudi_configs.get_or_default(SkipConfigValidation).into() {
return Ok(());
}
@@ -44,23 +41,21 @@ pub fn validate_configs(hudi_configs: &HudiConfigs) ->
crate::error::Result<()>
}
// additional validation
- let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
+ let table_version: isize = hudi_configs.get(TableVersion)?.into();
if table_version != 6 {
return Err(CoreError::Unsupported(
"Only support table version 6.".to_string(),
));
}
- let timeline_layout_version =
hudi_configs.get(TimelineLayoutVersion)?.to::<isize>();
+ let timeline_layout_version: isize =
hudi_configs.get(TimelineLayoutVersion)?.into();
if timeline_layout_version != 1 {
return Err(CoreError::Unsupported(
"Only support timeline layout version 1.".to_string(),
));
}
- let drops_partition_cols = hudi_configs
- .get_or_default(DropsPartitionFields)
- .to::<bool>();
+ let drops_partition_cols =
hudi_configs.get_or_default(DropsPartitionFields).into();
if drops_partition_cols {
return Err(CoreError::Unsupported(format!(
"Only support when `{}` is disabled",
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index 078ff05..8dca297 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -149,7 +149,7 @@ impl TimelineSelector {
fn get_timezone_from_configs(hudi_configs: &HudiConfigs) -> String {
hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
- .to::<String>()
+ .into()
}
fn parse_datetime(timezone: &str, timestamp: Option<&str>) ->
Result<Option<DateTime<Utc>>> {
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 60778a1..9ee6094 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -101,7 +101,7 @@ impl HudiDataSource {
self.table
.hudi_configs
.get_or_default(InputPartitions)
- .to::<usize>()
+ .into()
}
/// Check if the given expression can be pushed down to the Hudi table.