This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 45d4948b35 Validate parquet writer version (#19515)
45d4948b35 is described below
commit 45d4948b3535895515c6d6a8f550e59fe71c26aa
Author: Aly Abdelmoneim <[email protected]>
AuthorDate: Mon Jan 5 02:22:48 2026 +0200
Validate parquet writer version (#19515)
## Which issue does this PR close?
- Part of #17498.
## Rationale for this change
Currently, invalid `writer_version` values (e.g., "3.0") are accepted at
`SET` time and only fail later when writing parquet files. This PR adds
early validation so invalid values are rejected immediately with clear
error messages, following the same pattern as `ExplainFormat`.
## What changes are included in this PR?
- Add `DFWriterVersion` enum with `FromStr`, `Display`, `ConfigField`
implementations
- Change `ParquetOptions.writer_version` from `String` to
`DFWriterVersion`
- Remove `parse_version_string` function (validation now happens at
config time)
- Update proto conversions to validate during deserialization
- Add test for early validation
## Are these changes tested?
Yes. Added `test_parquet_writer_version_validation` that verifies:
- Valid values ("1.0", "2.0") are accepted
- Invalid values ("3.0", "invalid") are rejected immediately at SET time
- Error messages are clear and helpful
## Are there any user-facing changes?
Not exactly. Invalid `writer_version` values now error immediately when
set via `SET` command or proto deserialization, instead of failing later
during parquet file writing. This provides better error messages and
earlier feedback.
So the change is in the feedback not in the input.
---
datafusion/common/src/config.rs | 36 ++++++-
.../common/src/file_options/parquet_writer.rs | 28 ++----
datafusion/common/src/lib.rs | 2 +-
datafusion/common/src/parquet_config.rs | 108 +++++++++++++++++++++
datafusion/proto-common/src/from_proto/mod.rs | 4 +-
datafusion/proto-common/src/to_proto/mod.rs | 2 +-
datafusion/proto/src/logical_plan/file_formats.rs | 7 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 3 +-
8 files changed, 164 insertions(+), 26 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 2bea2ec5a4..95a0214743 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -23,6 +23,7 @@ use arrow_ipc::CompressionType;
use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
use crate::error::_config_err;
use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
+use crate::parquet_config::DFParquetWriterVersion;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
@@ -742,7 +743,7 @@ config_namespace! {
/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
- pub writer_version: String, default = "1.0".to_string()
+ pub writer_version: DFParquetWriterVersion, default =
DFParquetWriterVersion::default()
/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
///
@@ -3455,4 +3456,37 @@ mod tests {
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
+ #[cfg(feature = "parquet")]
+ #[test]
+ fn test_parquet_writer_version_validation() {
+ use crate::{config::ConfigOptions,
parquet_config::DFParquetWriterVersion};
+
+ let mut config = ConfigOptions::default();
+
+ // Valid values should work
+ config
+ .set("datafusion.execution.parquet.writer_version", "1.0")
+ .unwrap();
+ assert_eq!(
+ config.execution.parquet.writer_version,
+ DFParquetWriterVersion::V1_0
+ );
+
+ config
+ .set("datafusion.execution.parquet.writer_version", "2.0")
+ .unwrap();
+ assert_eq!(
+ config.execution.parquet.writer_version,
+ DFParquetWriterVersion::V2_0
+ );
+
+ // Invalid value should error immediately at SET time
+ let err = config
+ .set("datafusion.execution.parquet.writer_version", "3.0")
+ .unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Invalid or Unsupported Configuration: Invalid parquet writer
version: 3.0. Expected one of: 1.0, 2.0"
+ );
+ }
}
diff --git a/datafusion/common/src/file_options/parquet_writer.rs
b/datafusion/common/src/file_options/parquet_writer.rs
index 8aa0134d09..196cb96f38 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -33,7 +33,7 @@ use parquet::{
metadata::KeyValue,
properties::{
DEFAULT_STATISTICS_ENABLED, EnabledStatistics, WriterProperties,
- WriterPropertiesBuilder, WriterVersion,
+ WriterPropertiesBuilder,
},
},
schema::types::ColumnPath,
@@ -214,7 +214,7 @@ impl ParquetOptions {
let mut builder = WriterProperties::builder()
.set_data_page_size_limit(*data_pagesize_limit)
.set_write_batch_size(*write_batch_size)
- .set_writer_version(parse_version_string(writer_version.as_str())?)
+ .set_writer_version((*writer_version).into())
.set_dictionary_page_size_limit(*dictionary_page_size_limit)
.set_statistics_enabled(
statistics_enabled
@@ -373,18 +373,6 @@ pub fn parse_compression_string(
}
}
-pub(crate) fn parse_version_string(str_setting: &str) -> Result<WriterVersion>
{
- let str_setting_lower: &str = &str_setting.to_lowercase();
- match str_setting_lower {
- "1.0" => Ok(WriterVersion::PARQUET_1_0),
- "2.0" => Ok(WriterVersion::PARQUET_2_0),
- _ => Err(DataFusionError::Configuration(format!(
- "Unknown or unsupported parquet writer version {str_setting} \
- valid options are 1.0 and 2.0"
- ))),
- }
-}
-
pub(crate) fn parse_statistics_string(str_setting: &str) ->
Result<EnabledStatistics> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
@@ -405,6 +393,7 @@ mod tests {
#[cfg(feature = "parquet_encryption")]
use crate::config::ConfigFileEncryptionProperties;
use crate::config::{ParquetColumnOptions, ParquetEncryptionOptions,
ParquetOptions};
+ use crate::parquet_config::DFParquetWriterVersion;
use parquet::basic::Compression;
use parquet::file::properties::{
BloomFilterProperties, DEFAULT_BLOOM_FILTER_FPP,
DEFAULT_BLOOM_FILTER_NDV,
@@ -431,16 +420,17 @@ mod tests {
fn parquet_options_with_non_defaults() -> ParquetOptions {
let defaults = ParquetOptions::default();
- let writer_version = if defaults.writer_version.eq("1.0") {
- "2.0"
+ let writer_version = if
defaults.writer_version.eq(&DFParquetWriterVersion::V1_0)
+ {
+ DFParquetWriterVersion::V2_0
} else {
- "1.0"
+ DFParquetWriterVersion::V1_0
};
ParquetOptions {
data_pagesize_limit: 42,
write_batch_size: 42,
- writer_version: writer_version.into(),
+ writer_version,
compression: Some("zstd(22)".into()),
dictionary_enabled:
Some(!defaults.dictionary_enabled.unwrap_or(false)),
dictionary_page_size_limit: 42,
@@ -548,7 +538,7 @@ mod tests {
// global options
data_pagesize_limit: props.dictionary_page_size_limit(),
write_batch_size: props.write_batch_size(),
- writer_version: format!("{}.0",
props.writer_version().as_num()),
+ writer_version: props.writer_version().into(),
dictionary_page_size_limit: props.dictionary_page_size_limit(),
max_row_group_size: props.max_row_group_size(),
created_by: props.created_by().to_string(),
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 2dbc086886..df6659c6f8 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -51,6 +51,7 @@ pub mod instant;
pub mod metadata;
pub mod nested_struct;
mod null_equality;
+pub mod parquet_config;
pub mod parsers;
pub mod pruning;
pub mod rounding;
@@ -61,7 +62,6 @@ pub mod test_util;
pub mod tree_node;
pub mod types;
pub mod utils;
-
/// Reexport arrow crate
pub use arrow;
pub use column::Column;
diff --git a/datafusion/common/src/parquet_config.rs
b/datafusion/common/src/parquet_config.rs
new file mode 100644
index 0000000000..9d6d7a8856
--- /dev/null
+++ b/datafusion/common/src/parquet_config.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{self, Display};
+use std::str::FromStr;
+
+use crate::config::{ConfigField, Visit};
+use crate::error::{DataFusionError, Result};
+
+/// Parquet writer version options for controlling the Parquet file format
version
+///
+/// This enum validates parquet writer version values at configuration time,
+/// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands
+/// or proto deserialization.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
+pub enum DFParquetWriterVersion {
+ /// Parquet format version 1.0
+ #[default]
+ V1_0,
+ /// Parquet format version 2.0
+ V2_0,
+}
+
+/// Implement parsing strings to `DFParquetWriterVersion`
+impl FromStr for DFParquetWriterVersion {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "1.0" => Ok(DFParquetWriterVersion::V1_0),
+ "2.0" => Ok(DFParquetWriterVersion::V2_0),
+ other => Err(DataFusionError::Configuration(format!(
+ "Invalid parquet writer version: {other}. Expected one of:
1.0, 2.0"
+ ))),
+ }
+ }
+}
+
+impl Display for DFParquetWriterVersion {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let s = match self {
+ DFParquetWriterVersion::V1_0 => "1.0",
+ DFParquetWriterVersion::V2_0 => "2.0",
+ };
+ write!(f, "{s}")
+ }
+}
+
+impl ConfigField for DFParquetWriterVersion {
+ fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str)
{
+ v.some(key, self, description)
+ }
+
+ fn set(&mut self, _: &str, value: &str) -> Result<()> {
+ *self = DFParquetWriterVersion::from_str(value)?;
+ Ok(())
+ }
+}
+
+/// Convert `DFParquetWriterVersion` to parquet crate's `WriterVersion`
+///
+/// This conversion is infallible since `DFParquetWriterVersion` only contains
+/// valid values that have been validated at configuration time.
+#[cfg(feature = "parquet")]
+impl From<DFParquetWriterVersion> for parquet::file::properties::WriterVersion
{
+ fn from(value: DFParquetWriterVersion) -> Self {
+ match value {
+ DFParquetWriterVersion::V1_0 => {
+ parquet::file::properties::WriterVersion::PARQUET_1_0
+ }
+ DFParquetWriterVersion::V2_0 => {
+ parquet::file::properties::WriterVersion::PARQUET_2_0
+ }
+ }
+ }
+}
+
+/// Convert parquet crate's `WriterVersion` to `DFParquetWriterVersion`
+///
+/// This is used when converting from existing parquet writer properties,
+/// such as when reading from proto or test code.
+#[cfg(feature = "parquet")]
+impl From<parquet::file::properties::WriterVersion> for DFParquetWriterVersion
{
+ fn from(version: parquet::file::properties::WriterVersion) -> Self {
+ match version {
+ parquet::file::properties::WriterVersion::PARQUET_1_0 => {
+ DFParquetWriterVersion::V1_0
+ }
+ parquet::file::properties::WriterVersion::PARQUET_2_0 => {
+ DFParquetWriterVersion::V2_0
+ }
+ }
+ }
+}
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index 7cb7a92031..e8e71c3884 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -951,7 +951,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as usize,
write_batch_size: value.write_batch_size as usize,
- writer_version: value.writer_version.clone(),
+ writer_version: value.writer_version.parse().map_err(|e| {
+ DataFusionError::Internal(format!("Failed to parse
writer_version: {e}"))
+ })?,
compression: value.compression_opt.clone().map(|opt| match opt {
protobuf::parquet_options::CompressionOpt::Compression(v) =>
Some(v),
}).unwrap_or(None),
diff --git a/datafusion/proto-common/src/to_proto/mod.rs
b/datafusion/proto-common/src/to_proto/mod.rs
index dfa136717f..fee3656482 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -860,7 +860,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
force_filter_selections: value.force_filter_selections,
data_pagesize_limit: value.data_pagesize_limit as u64,
write_batch_size: value.write_batch_size as u64,
- writer_version: value.writer_version.clone(),
+ writer_version: value.writer_version.to_string(),
compression_opt:
value.compression.clone().map(protobuf::parquet_options::CompressionOpt::Compression),
dictionary_enabled_opt:
value.dictionary_enabled.map(protobuf::parquet_options::DictionaryEnabledOpt::DictionaryEnabled),
dictionary_page_size_limit: value.dictionary_page_size_limit as
u64,
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs
b/datafusion/proto/src/logical_plan/file_formats.rs
index 87ce4d524f..436a064937 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -381,7 +381,7 @@ mod parquet {
force_filter_selections:
global_options.global.force_filter_selections,
data_pagesize_limit: global_options.global.data_pagesize_limit
as u64,
write_batch_size: global_options.global.write_batch_size as
u64,
- writer_version: global_options.global.writer_version.clone(),
+ writer_version:
global_options.global.writer_version.to_string(),
compression_opt:
global_options.global.compression.map(|compression| {
parquet_options::CompressionOpt::Compression(compression)
}),
@@ -477,7 +477,10 @@ mod parquet {
force_filter_selections: proto.force_filter_selections,
data_pagesize_limit: proto.data_pagesize_limit as usize,
write_batch_size: proto.write_batch_size as usize,
- writer_version: proto.writer_version.clone(),
+ // TODO: Consider changing to TryFrom to avoid panic on
invalid proto data
+ writer_version: proto.writer_version.parse().expect("
+ Invalid parquet writer version in proto, expected '1.0' or
'2.0'
+ "),
compression: proto.compression_opt.as_ref().map(|opt| match opt {
parquet_options::CompressionOpt::Compression(compression) =>
compression.clone(),
}),
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 77676fc2fd..bcfda648b5 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -31,6 +31,7 @@ use datafusion::datasource::listing::{
use datafusion::execution::options::ArrowReadOptions;
use datafusion::optimizer::Optimizer;
use datafusion::optimizer::optimize_unions::OptimizeUnions;
+use datafusion_common::parquet_config::DFParquetWriterVersion;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_functions_aggregate::sum::sum_distinct;
use prost::Message;
@@ -464,7 +465,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() ->
Result<()> {
parquet_format.global.bloom_filter_on_read = true;
parquet_format.global.created_by = "DataFusion Test".to_string();
- parquet_format.global.writer_version = "PARQUET_2_0".to_string();
+ parquet_format.global.writer_version = DFParquetWriterVersion::V2_0;
parquet_format.global.write_batch_size = 111;
parquet_format.global.data_pagesize_limit = 222;
parquet_format.global.data_page_row_count_limit = 333;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]