This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bf43bb2eed Add partial serde support for ParquetWriterOptions (#8627)
bf43bb2eed is described below

commit bf43bb2eed304369c078637bc84d1b842c24b399
Author: Andy Grove <[email protected]>
AuthorDate: Sat Dec 23 09:25:07 2023 -0700

    Add partial serde support for ParquetWriterOptions (#8627)
    
    * Add serde support for ParquetWriterOptions
    
    * save progress
    
    * test passes
    
    * Improve test
    
    * Refactor and add link to follow on issue
    
    * remove duplicate code
    
    * clippy
    
    * Regen
    
    * remove comments from proto file
    
    * change proto types from i32 to u32 pre feedback on PR
    
    * change to u64
---
 datafusion/proto/proto/datafusion.proto            |  15 +
 datafusion/proto/src/generated/pbjson.rs           | 321 +++++++++++++++++++++
 datafusion/proto/src/generated/prost.rs            |  28 +-
 datafusion/proto/src/logical_plan/mod.rs           | 146 ++++++++--
 datafusion/proto/src/physical_plan/from_proto.rs   |   7 +
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  41 ++-
 6 files changed, 524 insertions(+), 34 deletions(-)

diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 05f0b64343..d02fc8e91b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1206,6 +1206,7 @@ message PartitionColumn {
 message FileTypeWriterOptions {
   oneof FileType {
     JsonWriterOptions json_options = 1;
+    ParquetWriterOptions parquet_options = 2;
   }
 }
 
@@ -1213,6 +1214,20 @@ message JsonWriterOptions {
   CompressionTypeVariant compression = 1;
 }
 
+message ParquetWriterOptions {
+    WriterProperties writer_properties = 1;
+}
+
+message WriterProperties {
+  uint64 data_page_size_limit = 1;
+  uint64 dictionary_page_size_limit = 2;
+  uint64 data_page_row_count_limit = 3;
+  uint64 write_batch_size = 4;
+  uint64 max_row_group_size = 5;
+  string writer_version = 6;
+  string created_by = 7;
+}
+
 message FileSinkConfig {
   reserved 6; // writer_mode
 
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 0fdeab0a40..f860b1f1e6 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7890,6 +7890,9 @@ impl serde::Serialize for FileTypeWriterOptions {
                 file_type_writer_options::FileType::JsonOptions(v) => {
                     struct_ser.serialize_field("jsonOptions", v)?;
                 }
+                file_type_writer_options::FileType::ParquetOptions(v) => {
+                    struct_ser.serialize_field("parquetOptions", v)?;
+                }
             }
         }
         struct_ser.end()
@@ -7904,11 +7907,14 @@ impl<'de> serde::Deserialize<'de> for 
FileTypeWriterOptions {
         const FIELDS: &[&str] = &[
             "json_options",
             "jsonOptions",
+            "parquet_options",
+            "parquetOptions",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             JsonOptions,
+            ParquetOptions,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -7931,6 +7937,7 @@ impl<'de> serde::Deserialize<'de> for 
FileTypeWriterOptions {
                     {
                         match value {
                             "jsonOptions" | "json_options" => 
Ok(GeneratedField::JsonOptions),
+                            "parquetOptions" | "parquet_options" => 
Ok(GeneratedField::ParquetOptions),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -7958,6 +7965,13 @@ impl<'de> serde::Deserialize<'de> for 
FileTypeWriterOptions {
                                 return 
Err(serde::de::Error::duplicate_field("jsonOptions"));
                             }
                             file_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::JsonOptions)
+;
+                        }
+                        GeneratedField::ParquetOptions => {
+                            if file_type__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("parquetOptions"));
+                            }
+                            file_type__ = 
map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::ParquetOptions)
 ;
                         }
                     }
@@ -15171,6 +15185,98 @@ impl<'de> serde::Deserialize<'de> for 
ParquetScanExecNode {
         deserializer.deserialize_struct("datafusion.ParquetScanExecNode", 
FIELDS, GeneratedVisitor)
     }
 }
+impl serde::Serialize for ParquetWriterOptions {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.writer_properties.is_some() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.ParquetWriterOptions", len)?;
+        if let Some(v) = self.writer_properties.as_ref() {
+            struct_ser.serialize_field("writerProperties", v)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for ParquetWriterOptions {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "writer_properties",
+            "writerProperties",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            WriterProperties,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "writerProperties" | "writer_properties" => 
Ok(GeneratedField::WriterProperties),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = ParquetWriterOptions;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.ParquetWriterOptions")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<ParquetWriterOptions, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut writer_properties__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::WriterProperties => {
+                            if writer_properties__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("writerProperties"));
+                            }
+                            writer_properties__ = map_.next_value()?;
+                        }
+                    }
+                }
+                Ok(ParquetWriterOptions {
+                    writer_properties: writer_properties__,
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.ParquetWriterOptions", 
FIELDS, GeneratedVisitor)
+    }
+}
 impl serde::Serialize for PartialTableReference {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
@@ -27144,3 +27250,218 @@ impl<'de> serde::Deserialize<'de> for WindowNode {
         deserializer.deserialize_struct("datafusion.WindowNode", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for WriterProperties {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        use serde::ser::SerializeStruct;
+        let mut len = 0;
+        if self.data_page_size_limit != 0 {
+            len += 1;
+        }
+        if self.dictionary_page_size_limit != 0 {
+            len += 1;
+        }
+        if self.data_page_row_count_limit != 0 {
+            len += 1;
+        }
+        if self.write_batch_size != 0 {
+            len += 1;
+        }
+        if self.max_row_group_size != 0 {
+            len += 1;
+        }
+        if !self.writer_version.is_empty() {
+            len += 1;
+        }
+        if !self.created_by.is_empty() {
+            len += 1;
+        }
+        let mut struct_ser = 
serializer.serialize_struct("datafusion.WriterProperties", len)?;
+        if self.data_page_size_limit != 0 {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("dataPageSizeLimit", 
ToString::to_string(&self.data_page_size_limit).as_str())?;
+        }
+        if self.dictionary_page_size_limit != 0 {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("dictionaryPageSizeLimit", 
ToString::to_string(&self.dictionary_page_size_limit).as_str())?;
+        }
+        if self.data_page_row_count_limit != 0 {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("dataPageRowCountLimit", 
ToString::to_string(&self.data_page_row_count_limit).as_str())?;
+        }
+        if self.write_batch_size != 0 {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("writeBatchSize", 
ToString::to_string(&self.write_batch_size).as_str())?;
+        }
+        if self.max_row_group_size != 0 {
+            #[allow(clippy::needless_borrow)]
+            struct_ser.serialize_field("maxRowGroupSize", 
ToString::to_string(&self.max_row_group_size).as_str())?;
+        }
+        if !self.writer_version.is_empty() {
+            struct_ser.serialize_field("writerVersion", &self.writer_version)?;
+        }
+        if !self.created_by.is_empty() {
+            struct_ser.serialize_field("createdBy", &self.created_by)?;
+        }
+        struct_ser.end()
+    }
+}
+impl<'de> serde::Deserialize<'de> for WriterProperties {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "data_page_size_limit",
+            "dataPageSizeLimit",
+            "dictionary_page_size_limit",
+            "dictionaryPageSizeLimit",
+            "data_page_row_count_limit",
+            "dataPageRowCountLimit",
+            "write_batch_size",
+            "writeBatchSize",
+            "max_row_group_size",
+            "maxRowGroupSize",
+            "writer_version",
+            "writerVersion",
+            "created_by",
+            "createdBy",
+        ];
+
+        #[allow(clippy::enum_variant_names)]
+        enum GeneratedField {
+            DataPageSizeLimit,
+            DictionaryPageSizeLimit,
+            DataPageRowCountLimit,
+            WriteBatchSize,
+            MaxRowGroupSize,
+            WriterVersion,
+            CreatedBy,
+        }
+        impl<'de> serde::Deserialize<'de> for GeneratedField {
+            fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
+            where
+                D: serde::Deserializer<'de>,
+            {
+                struct GeneratedVisitor;
+
+                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+                    type Value = GeneratedField;
+
+                    fn expecting(&self, formatter: &mut 
std::fmt::Formatter<'_>) -> std::fmt::Result {
+                        write!(formatter, "expected one of: {:?}", &FIELDS)
+                    }
+
+                    #[allow(unused_variables)]
+                    fn visit_str<E>(self, value: &str) -> 
std::result::Result<GeneratedField, E>
+                    where
+                        E: serde::de::Error,
+                    {
+                        match value {
+                            "dataPageSizeLimit" | "data_page_size_limit" => 
Ok(GeneratedField::DataPageSizeLimit),
+                            "dictionaryPageSizeLimit" | 
"dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
+                            "dataPageRowCountLimit" | 
"data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
+                            "writeBatchSize" | "write_batch_size" => 
Ok(GeneratedField::WriteBatchSize),
+                            "maxRowGroupSize" | "max_row_group_size" => 
Ok(GeneratedField::MaxRowGroupSize),
+                            "writerVersion" | "writer_version" => 
Ok(GeneratedField::WriterVersion),
+                            "createdBy" | "created_by" => 
Ok(GeneratedField::CreatedBy),
+                            _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
+                        }
+                    }
+                }
+                deserializer.deserialize_identifier(GeneratedVisitor)
+            }
+        }
+        struct GeneratedVisitor;
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = WriterProperties;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                formatter.write_str("struct datafusion.WriterProperties")
+            }
+
+            fn visit_map<V>(self, mut map_: V) -> 
std::result::Result<WriterProperties, V::Error>
+                where
+                    V: serde::de::MapAccess<'de>,
+            {
+                let mut data_page_size_limit__ = None;
+                let mut dictionary_page_size_limit__ = None;
+                let mut data_page_row_count_limit__ = None;
+                let mut write_batch_size__ = None;
+                let mut max_row_group_size__ = None;
+                let mut writer_version__ = None;
+                let mut created_by__ = None;
+                while let Some(k) = map_.next_key()? {
+                    match k {
+                        GeneratedField::DataPageSizeLimit => {
+                            if data_page_size_limit__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("dataPageSizeLimit"));
+                            }
+                            data_page_size_limit__ = 
+                                
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                        GeneratedField::DictionaryPageSizeLimit => {
+                            if dictionary_page_size_limit__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
+                            }
+                            dictionary_page_size_limit__ = 
+                                
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                        GeneratedField::DataPageRowCountLimit => {
+                            if data_page_row_count_limit__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("dataPageRowCountLimit"));
+                            }
+                            data_page_row_count_limit__ = 
+                                
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                        GeneratedField::WriteBatchSize => {
+                            if write_batch_size__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("writeBatchSize"));
+                            }
+                            write_batch_size__ = 
+                                
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                        GeneratedField::MaxRowGroupSize => {
+                            if max_row_group_size__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("maxRowGroupSize"));
+                            }
+                            max_row_group_size__ = 
+                                
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+                            ;
+                        }
+                        GeneratedField::WriterVersion => {
+                            if writer_version__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("writerVersion"));
+                            }
+                            writer_version__ = Some(map_.next_value()?);
+                        }
+                        GeneratedField::CreatedBy => {
+                            if created_by__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("createdBy"));
+                            }
+                            created_by__ = Some(map_.next_value()?);
+                        }
+                    }
+                }
+                Ok(WriterProperties {
+                    data_page_size_limit: 
data_page_size_limit__.unwrap_or_default(),
+                    dictionary_page_size_limit: 
dictionary_page_size_limit__.unwrap_or_default(),
+                    data_page_row_count_limit: 
data_page_row_count_limit__.unwrap_or_default(),
+                    write_batch_size: write_batch_size__.unwrap_or_default(),
+                    max_row_group_size: 
max_row_group_size__.unwrap_or_default(),
+                    writer_version: writer_version__.unwrap_or_default(),
+                    created_by: created_by__.unwrap_or_default(),
+                })
+            }
+        }
+        deserializer.deserialize_struct("datafusion.WriterProperties", FIELDS, 
GeneratedVisitor)
+    }
+}
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index e44355859d..459d5a965c 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1642,7 +1642,7 @@ pub struct PartitionColumn {
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileTypeWriterOptions {
-    #[prost(oneof = "file_type_writer_options::FileType", tags = "1")]
+    #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2")]
     pub file_type: ::core::option::Option<file_type_writer_options::FileType>,
 }
 /// Nested message and enum types in `FileTypeWriterOptions`.
@@ -1652,6 +1652,8 @@ pub mod file_type_writer_options {
     pub enum FileType {
         #[prost(message, tag = "1")]
         JsonOptions(super::JsonWriterOptions),
+        #[prost(message, tag = "2")]
+        ParquetOptions(super::ParquetWriterOptions),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
@@ -1662,6 +1664,30 @@ pub struct JsonWriterOptions {
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ParquetWriterOptions {
+    #[prost(message, optional, tag = "1")]
+    pub writer_properties: ::core::option::Option<WriterProperties>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct WriterProperties {
+    #[prost(uint64, tag = "1")]
+    pub data_page_size_limit: u64,
+    #[prost(uint64, tag = "2")]
+    pub dictionary_page_size_limit: u64,
+    #[prost(uint64, tag = "3")]
+    pub data_page_row_count_limit: u64,
+    #[prost(uint64, tag = "4")]
+    pub write_batch_size: u64,
+    #[prost(uint64, tag = "5")]
+    pub max_row_group_size: u64,
+    #[prost(string, tag = "6")]
+    pub writer_version: ::prost::alloc::string::String,
+    #[prost(string, tag = "7")]
+    pub created_by: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
 pub struct FileSinkConfig {
     #[prost(string, tag = "1")]
     pub object_store_url: ::prost::alloc::string::String,
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index e03b3ffa7b..d137a41fa1 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -23,7 +23,8 @@ use std::sync::Arc;
 use crate::common::{byte_to_string, proto_error, str_to_byte};
 use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
 use crate::protobuf::{
-    copy_to_node, CustomTableScanNode, LogicalExprNodeCollection, SqlOption,
+    copy_to_node, file_type_writer_options, CustomTableScanNode,
+    LogicalExprNodeCollection, SqlOption,
 };
 use crate::{
     convert_required,
@@ -49,7 +50,7 @@ use datafusion::{
 use datafusion_common::{
     context, file_options::StatementOptions, internal_err, not_impl_err,
     parsers::CompressionTypeVariant, plan_datafusion_err, DataFusionError, 
FileType,
-    OwnedTableReference, Result,
+    FileTypeWriterOptions, OwnedTableReference, Result,
 };
 use datafusion_expr::{
     dml,
@@ -62,6 +63,8 @@ use datafusion_expr::{
     DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder,
 };
 
+use datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
+use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
 use datafusion_expr::dml::CopyOptions;
 use prost::bytes::BufMut;
 use prost::Message;
@@ -833,19 +836,48 @@ impl AsLogicalPlan for LogicalPlanNode {
 
                 let copy_options = match &copy.copy_options {
                     Some(copy_to_node::CopyOptions::SqlOptions(opt)) => {
-                        let options = opt.option.iter().map(|o| 
(o.key.clone(), o.value.clone())).collect();
-                        CopyOptions::SQLOptions(StatementOptions::from(
-                            &options,
-                        ))
+                        let options = opt
+                            .option
+                            .iter()
+                            .map(|o| (o.key.clone(), o.value.clone()))
+                            .collect();
+                        
CopyOptions::SQLOptions(StatementOptions::from(&options))
                     }
-                    Some(copy_to_node::CopyOptions::WriterOptions(_)) => {
-                        return Err(proto_error(
-                            "LogicalPlan serde is not yet implemented for 
CopyTo with WriterOptions",
-                        ))
+                    Some(copy_to_node::CopyOptions::WriterOptions(opt)) => {
+                        match &opt.file_type {
+                            Some(ft) => match ft {
+                                
file_type_writer_options::FileType::ParquetOptions(
+                                    writer_options,
+                                ) => {
+                                    let writer_properties =
+                                        match 
&writer_options.writer_properties {
+                                            Some(serialized_writer_options) => 
{
+                                                writer_properties_from_proto(
+                                                    serialized_writer_options,
+                                                )?
+                                            }
+                                            _ => WriterProperties::default(),
+                                        };
+                                    CopyOptions::WriterOptions(Box::new(
+                                        FileTypeWriterOptions::Parquet(
+                                            
ParquetWriterOptions::new(writer_properties),
+                                        ),
+                                    ))
+                                }
+                                _ => {
+                                    return Err(proto_error(
+                                        "WriterOptions unsupported file_type",
+                                    ))
+                                }
+                            },
+                            None => {
+                                return Err(proto_error(
+                                    "WriterOptions missing file_type",
+                                ))
+                            }
+                        }
                     }
-                    other => return Err(proto_error(format!(
-                        "LogicalPlan serde is not yet implemented for CopyTo 
with CopyOptions {other:?}",
-                    )))
+                    None => return Err(proto_error("CopyTo missing 
CopyOptions")),
                 };
                 Ok(datafusion_expr::LogicalPlan::Copy(
                     datafusion_expr::dml::CopyTo {
@@ -1580,22 +1612,48 @@ impl AsLogicalPlan for LogicalPlanNode {
                     extension_codec,
                 )?;
 
-                let copy_options_proto: Option<copy_to_node::CopyOptions> = 
match copy_options {
-                    CopyOptions::SQLOptions(opt) => {
-                        let options: Vec<SqlOption> = 
opt.clone().into_inner().iter().map(|(k, v)| SqlOption {
-                            key: k.to_string(),
-                            value: v.to_string(),
-                        }).collect();
-                        
Some(copy_to_node::CopyOptions::SqlOptions(protobuf::SqlOptions {
-                            option: options
-                        }))
-                    }
-                    CopyOptions::WriterOptions(_) => {
-                        return Err(proto_error(
-                            "LogicalPlan serde is not yet implemented for 
CopyTo with WriterOptions",
-                        ))
-                    }
-                };
+                let copy_options_proto: Option<copy_to_node::CopyOptions> =
+                    match copy_options {
+                        CopyOptions::SQLOptions(opt) => {
+                            let options: Vec<SqlOption> = opt
+                                .clone()
+                                .into_inner()
+                                .iter()
+                                .map(|(k, v)| SqlOption {
+                                    key: k.to_string(),
+                                    value: v.to_string(),
+                                })
+                                .collect();
+                            Some(copy_to_node::CopyOptions::SqlOptions(
+                                protobuf::SqlOptions { option: options },
+                            ))
+                        }
+                        CopyOptions::WriterOptions(opt) => {
+                            match opt.as_ref() {
+                                FileTypeWriterOptions::Parquet(parquet_opts) 
=> {
+                                    let parquet_writer_options =
+                                        protobuf::ParquetWriterOptions {
+                                            writer_properties: Some(
+                                                writer_properties_to_proto(
+                                                    
&parquet_opts.writer_options,
+                                                ),
+                                            ),
+                                        };
+                                    let parquet_options = 
file_type_writer_options::FileType::ParquetOptions(parquet_writer_options);
+                                    
Some(copy_to_node::CopyOptions::WriterOptions(
+                                        protobuf::FileTypeWriterOptions {
+                                            file_type: Some(parquet_options),
+                                        },
+                                    ))
+                                }
+                                _ => {
+                                    return Err(proto_error(
+                                        "Unsupported FileTypeWriterOptions in 
CopyTo",
+                                    ))
+                                }
+                            }
+                        }
+                    };
 
                 Ok(protobuf::LogicalPlanNode {
                     logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
@@ -1615,3 +1673,33 @@ impl AsLogicalPlan for LogicalPlanNode {
         }
     }
 }
+
+pub(crate) fn writer_properties_to_proto(
+    props: &WriterProperties,
+) -> protobuf::WriterProperties {
+    protobuf::WriterProperties {
+        data_page_size_limit: props.data_page_size_limit() as u64,
+        dictionary_page_size_limit: props.dictionary_page_size_limit() as u64,
+        data_page_row_count_limit: props.data_page_row_count_limit() as u64,
+        write_batch_size: props.write_batch_size() as u64,
+        max_row_group_size: props.max_row_group_size() as u64,
+        writer_version: format!("{:?}", props.writer_version()),
+        created_by: props.created_by().to_string(),
+    }
+}
+
+pub(crate) fn writer_properties_from_proto(
+    props: &protobuf::WriterProperties,
+) -> Result<WriterProperties, DataFusionError> {
+    let writer_version = WriterVersion::from_str(&props.writer_version)
+        .map_err(|e| proto_error(e.to_string()))?;
+    Ok(WriterProperties::builder()
+        .set_created_by(props.created_by.clone())
+        .set_writer_version(writer_version)
+        .set_dictionary_page_size_limit(props.dictionary_page_size_limit as 
usize)
+        .set_data_page_row_count_limit(props.data_page_row_count_limit as 
usize)
+        .set_data_page_size_limit(props.data_page_size_limit as usize)
+        .set_write_batch_size(props.write_batch_size as usize)
+        .set_max_row_group_size(props.max_row_group_size as usize)
+        .build())
+}
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 65f9f139a8..824eb60a57 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -40,6 +40,7 @@ use datafusion::physical_plan::{
     functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics, 
WindowExpr,
 };
 use datafusion_common::file_options::json_writer::JsonWriterOptions;
+use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::stats::Precision;
 use datafusion_common::{
@@ -52,6 +53,7 @@ use crate::logical_plan;
 use crate::protobuf;
 use crate::protobuf::physical_expr_node::ExprType;
 
+use crate::logical_plan::writer_properties_from_proto;
 use chrono::{TimeZone, Utc};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -769,6 +771,11 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for 
FileTypeWriterOptions {
             protobuf::file_type_writer_options::FileType::JsonOptions(opts) => 
Ok(
                 Self::JSON(JsonWriterOptions::new(opts.compression().into())),
             ),
+            protobuf::file_type_writer_options::FileType::ParquetOptions(opt) 
=> {
+                let props = opt.writer_properties.clone().unwrap_or_default();
+                let writer_properties = writer_properties_from_proto(&props)?;
+                Ok(Self::Parquet(ParquetWriterOptions::new(writer_properties)))
+            }
         }
     }
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9798b06f47..3eeae01a64 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -31,7 +31,7 @@ use datafusion::datasource::provider::TableProviderFactory;
 use datafusion::datasource::TableProvider;
 use datafusion::execution::context::SessionState;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
 use datafusion::physical_plan::functions::make_scalar_function;
 use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, 
SessionContext};
 use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -330,7 +330,6 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> 
Result<()> {
 }
 
 #[tokio::test]
-#[ignore] // see https://github.com/apache/arrow-datafusion/issues/8619
 async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
     let ctx = SessionContext::new();
 
@@ -339,11 +338,17 @@ async fn roundtrip_logical_plan_copy_to_writer_options() 
-> Result<()> {
     let writer_properties = WriterProperties::builder()
         .set_bloom_filter_enabled(true)
         .set_created_by("DataFusion Test".to_string())
+        .set_writer_version(WriterVersion::PARQUET_2_0)
+        .set_write_batch_size(111)
+        .set_data_page_size_limit(222)
+        .set_data_page_row_count_limit(333)
+        .set_dictionary_page_size_limit(444)
+        .set_max_row_group_size(555)
         .build();
     let plan = LogicalPlan::Copy(CopyTo {
         input: Arc::new(input),
-        output_url: "test.csv".to_string(),
-        file_format: FileType::CSV,
+        output_url: "test.parquet".to_string(),
+        file_format: FileType::PARQUET,
         single_file_output: true,
         copy_options: CopyOptions::WriterOptions(Box::new(
             
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
@@ -354,6 +359,34 @@ async fn roundtrip_logical_plan_copy_to_writer_options() 
-> Result<()> {
     let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
     assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
 
+    match logical_round_trip {
+        LogicalPlan::Copy(copy_to) => {
+            assert_eq!("test.parquet", copy_to.output_url);
+            assert_eq!(FileType::PARQUET, copy_to.file_format);
+            assert!(copy_to.single_file_output);
+            match &copy_to.copy_options {
+                CopyOptions::WriterOptions(y) => match y.as_ref() {
+                    FileTypeWriterOptions::Parquet(p) => {
+                        let props = &p.writer_options;
+                        assert_eq!("DataFusion Test", props.created_by());
+                        assert_eq!(
+                            "PARQUET_2_0",
+                            format!("{:?}", props.writer_version())
+                        );
+                        assert_eq!(111, props.write_batch_size());
+                        assert_eq!(222, props.data_page_size_limit());
+                        assert_eq!(333, props.data_page_row_count_limit());
+                        assert_eq!(444, props.dictionary_page_size_limit());
+                        assert_eq!(555, props.max_row_group_size());
+                    }
+                    _ => panic!(),
+                },
+                _ => panic!(),
+            }
+        }
+        _ => panic!(),
+    }
+
     Ok(())
 }
 


Reply via email to