This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c8873af12 Allow adding user defined metadata to `ParquetSink` (#10224)
9c8873af12 is described below

commit 9c8873af12826e47f5743991859790df7a3b6400
Author: wiedld <[email protected]>
AuthorDate: Fri Apr 26 03:42:16 2024 -0700

    Allow adding user defined metadata to `ParquetSink` (#10224)
    
    * chore: make explicit what ParquetWriterOptions are created from a subset 
of TableParquetOptions
    
    * refactor: restore the ability to add kv metadata into the generated file 
sink
    
    * test: demomnstrate API contract for metadata TableParquetOptions
    
    * chore: update code docs
    
    * fix: parse on proper delimiter, and improve tests
    
    * fix: enable any character in the metadata string value, by having any key 
parsing be a part of the format.metadata::key
---
 datafusion/common/src/config.rs                    |  73 ++++++++++++++-
 datafusion/common/src/file_options/mod.rs          |   4 +
 .../common/src/file_options/parquet_writer.rs      | 104 +++++++++++++++------
 .../core/src/datasource/file_format/parquet.rs     |  29 +++++-
 datafusion/proto/src/physical_plan/from_proto.rs   |   1 +
 datafusion/sqllogictest/test_files/copy.slt        |  64 ++++++++++++-
 6 files changed, 238 insertions(+), 37 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 30ab9a339b..8539ca0874 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1364,12 +1364,31 @@ impl TableOptions {
 
 /// Options that control how Parquet files are read, including global options
 /// that apply to all columns and optional column-specific overrides
+///
+/// Closely tied to 
[`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
+/// Properties not included in [`TableParquetOptions`] may not be configurable 
at the external API
+/// (e.g. sorting_columns).
 #[derive(Clone, Default, Debug, PartialEq)]
 pub struct TableParquetOptions {
     /// Global Parquet options that propagates to all columns.
     pub global: ParquetOptions,
     /// Column specific options. Default usage is parquet.XX::column.
     pub column_specific_options: HashMap<String, ColumnOptions>,
+    /// Additional file-level metadata to include. Inserted into the 
key_value_metadata
+    /// for the written 
[`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
+    ///
+    /// Multiple entries are permitted
+    /// ```sql
+    /// OPTIONS (
+    ///    'format.metadata::key1' '',
+    ///    'format.metadata::key2' 'value',
+    ///    'format.metadata::key3' 'value has spaces',
+    ///    'format.metadata::key4' 'value has special chars :: :',
+    ///    'format.metadata::key_dupe' 'original will be overwritten',
+    ///    'format.metadata::key_dupe' 'final'
+    /// )
+    /// ```
+    pub key_value_metadata: HashMap<String, Option<String>>,
 }
 
 impl ConfigField for TableParquetOptions {
@@ -1380,8 +1399,24 @@ impl ConfigField for TableParquetOptions {
     }
 
     fn set(&mut self, key: &str, value: &str) -> Result<()> {
-        // Determine the key if it's a global or column-specific setting
-        if key.contains("::") {
+        // Determine if the key is a global, metadata, or column-specific 
setting
+        if key.starts_with("metadata::") {
+            let k =
+                match key.split("::").collect::<Vec<_>>()[..] {
+                    [_meta] | [_meta, ""] => return 
Err(DataFusionError::Configuration(
+                        "Invalid metadata key provided, missing key in 
metadata::<key>"
+                            .to_string(),
+                    )),
+                    [_meta, k] => k.into(),
+                    _ => {
+                        return Err(DataFusionError::Configuration(format!(
+                        "Invalid metadata key provided, found too many '::' in 
\"{key}\""
+                    )))
+                    }
+                };
+            self.key_value_metadata.insert(k, Some(value.into()));
+            Ok(())
+        } else if key.contains("::") {
             self.column_specific_options.set(key, value)
         } else {
             self.global.set(key, value)
@@ -1773,4 +1808,38 @@ mod tests {
             .iter()
             .any(|item| item.key == "format.bloom_filter_enabled::col1"))
     }
+
+    #[cfg(feature = "parquet")]
+    #[test]
+    fn parquet_table_options_config_metadata_entry() {
+        let mut table_config = TableOptions::new();
+        table_config.set_file_format(FileType::PARQUET);
+        table_config.set("format.metadata::key1", "").unwrap();
+        table_config.set("format.metadata::key2", "value2").unwrap();
+        table_config
+            .set("format.metadata::key3", "value with spaces ")
+            .unwrap();
+        table_config
+            .set("format.metadata::key4", "value with special chars :: :")
+            .unwrap();
+
+        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
+        assert_eq!(parsed_metadata.get("should not exist1"), None);
+        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
+        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
+        assert_eq!(
+            parsed_metadata.get("key3"),
+            Some(&Some("value with spaces ".into()))
+        );
+        assert_eq!(
+            parsed_metadata.get("key4"),
+            Some(&Some("value with special chars :: :".into()))
+        );
+
+        // duplicate keys are overwritten
+        table_config.set("format.metadata::key_dupe", "A").unwrap();
+        table_config.set("format.metadata::key_dupe", "B").unwrap();
+        let parsed_metadata = table_config.parquet.key_value_metadata;
+        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
+    }
 }
diff --git a/datafusion/common/src/file_options/mod.rs 
b/datafusion/common/src/file_options/mod.rs
index eb1ce1b364..a760619a7b 100644
--- a/datafusion/common/src/file_options/mod.rs
+++ b/datafusion/common/src/file_options/mod.rs
@@ -124,6 +124,10 @@ mod tests {
             123
         );
 
+        // properties which remain as default on WriterProperties
+        assert_eq!(properties.key_value_metadata(), None);
+        assert_eq!(properties.sorting_columns(), None);
+
         Ok(())
     }
 
diff --git a/datafusion/common/src/file_options/parquet_writer.rs 
b/datafusion/common/src/file_options/parquet_writer.rs
index 28e73ba48f..8ac6bcaa7a 100644
--- a/datafusion/common/src/file_options/parquet_writer.rs
+++ b/datafusion/common/src/file_options/parquet_writer.rs
@@ -17,11 +17,17 @@
 
 //! Options related to how parquet files should be written
 
-use crate::{config::TableParquetOptions, DataFusionError, Result};
+use crate::{
+    config::{ParquetOptions, TableParquetOptions},
+    DataFusionError, Result,
+};
 
 use parquet::{
     basic::{BrotliLevel, GzipLevel, ZstdLevel},
-    file::properties::{EnabledStatistics, WriterProperties, WriterVersion},
+    file::{
+        metadata::KeyValue,
+        properties::{EnabledStatistics, WriterProperties, WriterVersion},
+    },
     schema::types::ColumnPath,
 };
 
@@ -47,53 +53,87 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions 
{
     type Error = DataFusionError;
 
     fn try_from(parquet_options: &TableParquetOptions) -> Result<Self> {
-        let parquet_session_options = &parquet_options.global;
-        let mut builder = WriterProperties::builder()
-            
.set_data_page_size_limit(parquet_session_options.data_pagesize_limit)
-            .set_write_batch_size(parquet_session_options.write_batch_size)
-            .set_writer_version(parse_version_string(
-                &parquet_session_options.writer_version,
-            )?)
-            .set_dictionary_page_size_limit(
-                parquet_session_options.dictionary_page_size_limit,
-            )
-            .set_max_row_group_size(parquet_session_options.max_row_group_size)
-            .set_created_by(parquet_session_options.created_by.clone())
-            .set_column_index_truncate_length(
-                parquet_session_options.column_index_truncate_length,
+        let ParquetOptions {
+            data_pagesize_limit,
+            write_batch_size,
+            writer_version,
+            dictionary_page_size_limit,
+            max_row_group_size,
+            created_by,
+            column_index_truncate_length,
+            data_page_row_count_limit,
+            bloom_filter_enabled,
+            encoding,
+            dictionary_enabled,
+            compression,
+            statistics_enabled,
+            max_statistics_size,
+            bloom_filter_fpp,
+            bloom_filter_ndv,
+            //  below is not part of ParquetWriterOptions
+            enable_page_index: _,
+            pruning: _,
+            skip_metadata: _,
+            metadata_size_hint: _,
+            pushdown_filters: _,
+            reorder_filters: _,
+            allow_single_file_parallelism: _,
+            maximum_parallel_row_group_writers: _,
+            maximum_buffered_record_batches_per_stream: _,
+        } = &parquet_options.global;
+
+        let key_value_metadata = if 
!parquet_options.key_value_metadata.is_empty() {
+            Some(
+                parquet_options
+                    .key_value_metadata
+                    .clone()
+                    .drain()
+                    .map(|(key, value)| KeyValue { key, value })
+                    .collect::<Vec<_>>(),
             )
-            .set_data_page_row_count_limit(
-                parquet_session_options.data_page_row_count_limit,
-            )
-            
.set_bloom_filter_enabled(parquet_session_options.bloom_filter_enabled);
+        } else {
+            None
+        };
 
-        if let Some(encoding) = &parquet_session_options.encoding {
+        let mut builder = WriterProperties::builder()
+            .set_data_page_size_limit(*data_pagesize_limit)
+            .set_write_batch_size(*write_batch_size)
+            .set_writer_version(parse_version_string(writer_version.as_str())?)
+            .set_dictionary_page_size_limit(*dictionary_page_size_limit)
+            .set_max_row_group_size(*max_row_group_size)
+            .set_created_by(created_by.clone())
+            .set_column_index_truncate_length(*column_index_truncate_length)
+            .set_data_page_row_count_limit(*data_page_row_count_limit)
+            .set_bloom_filter_enabled(*bloom_filter_enabled)
+            .set_key_value_metadata(key_value_metadata);
+
+        if let Some(encoding) = &encoding {
             builder = builder.set_encoding(parse_encoding_string(encoding)?);
         }
 
-        if let Some(enabled) = parquet_session_options.dictionary_enabled {
-            builder = builder.set_dictionary_enabled(enabled);
+        if let Some(enabled) = dictionary_enabled {
+            builder = builder.set_dictionary_enabled(*enabled);
         }
 
-        if let Some(compression) = &parquet_session_options.compression {
+        if let Some(compression) = &compression {
             builder = 
builder.set_compression(parse_compression_string(compression)?);
         }
 
-        if let Some(statistics) = &parquet_session_options.statistics_enabled {
+        if let Some(statistics) = &statistics_enabled {
             builder =
                 
builder.set_statistics_enabled(parse_statistics_string(statistics)?);
         }
 
-        if let Some(size) = parquet_session_options.max_statistics_size {
-            builder = builder.set_max_statistics_size(size);
+        if let Some(size) = max_statistics_size {
+            builder = builder.set_max_statistics_size(*size);
         }
 
-        if let Some(fpp) = parquet_session_options.bloom_filter_fpp {
-            builder = builder.set_bloom_filter_fpp(fpp);
+        if let Some(fpp) = bloom_filter_fpp {
+            builder = builder.set_bloom_filter_fpp(*fpp);
         }
 
-        if let Some(ndv) = parquet_session_options.bloom_filter_ndv {
-            builder = builder.set_bloom_filter_ndv(ndv);
+        if let Some(ndv) = bloom_filter_ndv {
+            builder = builder.set_bloom_filter_ndv(*ndv);
         }
 
         for (column, options) in &parquet_options.column_specific_options {
@@ -141,6 +181,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions 
{
                     builder.set_column_max_statistics_size(path, 
max_statistics_size);
             }
         }
+
+        // ParquetWriterOptions will have defaults for the remaining fields 
(e.g. sorting_columns)
         Ok(ParquetWriterOptions {
             writer_options: builder.build(),
         })
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 66f506f9aa..7ec7d4540f 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -1129,7 +1129,7 @@ mod tests {
     };
     use parquet::arrow::arrow_reader::ArrowReaderOptions;
     use parquet::arrow::ParquetRecordBatchStreamBuilder;
-    use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
+    use parquet::file::metadata::{KeyValue, ParquetColumnIndex, 
ParquetOffsetIndex};
     use parquet::file::page_index::index::Index;
     use tokio::fs::File;
 
@@ -1857,7 +1857,13 @@ mod tests {
         };
         let parquet_sink = Arc::new(ParquetSink::new(
             file_sink_config,
-            TableParquetOptions::default(),
+            TableParquetOptions {
+                key_value_metadata: std::collections::HashMap::from([
+                    ("my-data".to_string(), Some("stuff".to_string())),
+                    ("my-data-bool-key".to_string(), None),
+                ]),
+                ..Default::default()
+            },
         ));
 
         // create data
@@ -1891,7 +1897,10 @@ mod tests {
         let (
             path,
             FileMetaData {
-                num_rows, schema, ..
+                num_rows,
+                schema,
+                key_value_metadata,
+                ..
             },
         ) = written.take(1).next().unwrap();
         let path_parts = path.parts().collect::<Vec<_>>();
@@ -1907,6 +1916,20 @@ mod tests {
             "output file metadata should contain col b"
         );
 
+        let mut key_value_metadata = key_value_metadata.unwrap();
+        key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
+        let expected_metadata = vec![
+            KeyValue {
+                key: "my-data".to_string(),
+                value: Some("stuff".to_string()),
+            },
+            KeyValue {
+                key: "my-data-bool-key".to_string(),
+                value: None,
+            },
+        ];
+        assert_eq!(key_value_metadata, expected_metadata);
+
         Ok(())
     }
 
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 6184332ea5..1d3edb7b60 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -969,6 +969,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for 
TableParquetOptions {
                 .unwrap()
                 .unwrap(),
             column_specific_options,
+            key_value_metadata: Default::default(),
         })
     }
 }
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 502dfd4fa6..d695e8514b 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -283,11 +283,73 @@ OPTIONS (
 'format.statistics_enabled::col2' none,
 'format.max_statistics_size' 123,
 'format.bloom_filter_fpp' 0.001,
-'format.bloom_filter_ndv' 100
+'format.bloom_filter_ndv' 100,
+'format.metadata::key' 'value'
 )
 ----
 2
 
+# valid vs invalid metadata
+
+# accepts map with a single entry
+statement ok
+COPY source_table
+TO 'test_files/scratch/copy/table_with_metadata/'
+STORED AS PARQUET
+OPTIONS (
+    'format.metadata::key' 'value'
+)
+
+# accepts multiple entries (on different keys)
+statement ok
+COPY source_table
+TO 'test_files/scratch/copy/table_with_metadata/'
+STORED AS PARQUET
+OPTIONS (
+    'format.metadata::key1' '',
+    'format.metadata::key2' 'value',
+    'format.metadata::key3' 'value with spaces',
+    'format.metadata::key4' 'value with special chars :: :'
+)
+
+# accepts multiple entries with the same key (will overwrite)
+statement ok
+COPY source_table
+TO 'test_files/scratch/copy/table_with_metadata/'
+STORED AS PARQUET
+OPTIONS (
+    'format.metadata::key1' 'value',
+    'format.metadata::key1' 'value'
+)
+
+# errors if key is missing
+statement error DataFusion error: Invalid or Unsupported Configuration: 
Invalid metadata key provided, missing key in metadata::<key>
+COPY source_table
+TO 'test_files/scratch/copy/table_with_metadata/'
+STORED AS PARQUET
+OPTIONS (
+    'format.metadata::' 'value'
+)
+
+# errors if key contains internal '::'
+statement error DataFusion error: Invalid or Unsupported Configuration: 
Invalid metadata key provided, found too many '::' in "metadata::key::extra"
+COPY source_table
+TO 'test_files/scratch/copy/table_with_metadata/'
+STORED AS PARQUET
+OPTIONS (
+    'format.metadata::key::extra' 'value'
+)
+
+# errors for invalid property (not stating `format.metadata`)
+statement error DataFusion error: Invalid or Unsupported Configuration: Config 
value "wrong-metadata" not found on ColumnOptions
+COPY source_table
+TO 'test_files/scratch/copy/table_with_metadata/'
+STORED AS PARQUET
+OPTIONS (
+    'format.wrong-metadata::key' 'value'
+)
+
+
 # validate multiple parquet file output with all options set
 statement ok
 CREATE EXTERNAL TABLE validate_parquet_with_options STORED AS PARQUET LOCATION 
'test_files/scratch/copy/table_with_options/';


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to