This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bf43bb2eed Add partial serde support for ParquetWriterOptions (#8627)
bf43bb2eed is described below
commit bf43bb2eed304369c078637bc84d1b842c24b399
Author: Andy Grove <[email protected]>
AuthorDate: Sat Dec 23 09:25:07 2023 -0700
Add partial serde support for ParquetWriterOptions (#8627)
* Add serde support for ParquetWriterOptions
* save progress
* test passes
* Improve test
* Refactor and add link to follow on issue
* remove duplicate code
* clippy
* Regen
* remove comments from proto file
* change proto types from i32 to u32 pre feedback on PR
* change to u64
---
datafusion/proto/proto/datafusion.proto | 15 +
datafusion/proto/src/generated/pbjson.rs | 321 +++++++++++++++++++++
datafusion/proto/src/generated/prost.rs | 28 +-
datafusion/proto/src/logical_plan/mod.rs | 146 ++++++++--
datafusion/proto/src/physical_plan/from_proto.rs | 7 +
.../proto/tests/cases/roundtrip_logical_plan.rs | 41 ++-
6 files changed, 524 insertions(+), 34 deletions(-)
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 05f0b64343..d02fc8e91b 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1206,6 +1206,7 @@ message PartitionColumn {
message FileTypeWriterOptions {
oneof FileType {
JsonWriterOptions json_options = 1;
+ ParquetWriterOptions parquet_options = 2;
}
}
@@ -1213,6 +1214,20 @@ message JsonWriterOptions {
CompressionTypeVariant compression = 1;
}
+message ParquetWriterOptions {
+ WriterProperties writer_properties = 1;
+}
+
+message WriterProperties {
+ uint64 data_page_size_limit = 1;
+ uint64 dictionary_page_size_limit = 2;
+ uint64 data_page_row_count_limit = 3;
+ uint64 write_batch_size = 4;
+ uint64 max_row_group_size = 5;
+ string writer_version = 6;
+ string created_by = 7;
+}
+
message FileSinkConfig {
reserved 6; // writer_mode
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 0fdeab0a40..f860b1f1e6 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7890,6 +7890,9 @@ impl serde::Serialize for FileTypeWriterOptions {
file_type_writer_options::FileType::JsonOptions(v) => {
struct_ser.serialize_field("jsonOptions", v)?;
}
+ file_type_writer_options::FileType::ParquetOptions(v) => {
+ struct_ser.serialize_field("parquetOptions", v)?;
+ }
}
}
struct_ser.end()
@@ -7904,11 +7907,14 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
const FIELDS: &[&str] = &[
"json_options",
"jsonOptions",
+ "parquet_options",
+ "parquetOptions",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
JsonOptions,
+ ParquetOptions,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -7931,6 +7937,7 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
{
match value {
"jsonOptions" | "json_options" =>
Ok(GeneratedField::JsonOptions),
+ "parquetOptions" | "parquet_options" =>
Ok(GeneratedField::ParquetOptions),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -7958,6 +7965,13 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
return
Err(serde::de::Error::duplicate_field("jsonOptions"));
}
file_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::JsonOptions)
+;
+ }
+ GeneratedField::ParquetOptions => {
+ if file_type__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("parquetOptions"));
+ }
+ file_type__ =
map_.next_value::<::std::option::Option<_>>()?.map(file_type_writer_options::FileType::ParquetOptions)
;
}
}
@@ -15171,6 +15185,98 @@ impl<'de> serde::Deserialize<'de> for
ParquetScanExecNode {
deserializer.deserialize_struct("datafusion.ParquetScanExecNode",
FIELDS, GeneratedVisitor)
}
}
+impl serde::Serialize for ParquetWriterOptions {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.writer_properties.is_some() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.ParquetWriterOptions", len)?;
+ if let Some(v) = self.writer_properties.as_ref() {
+ struct_ser.serialize_field("writerProperties", v)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for ParquetWriterOptions {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "writer_properties",
+ "writerProperties",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ WriterProperties,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "writerProperties" | "writer_properties" =>
Ok(GeneratedField::WriterProperties),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = ParquetWriterOptions;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.ParquetWriterOptions")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<ParquetWriterOptions, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut writer_properties__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::WriterProperties => {
+ if writer_properties__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("writerProperties"));
+ }
+ writer_properties__ = map_.next_value()?;
+ }
+ }
+ }
+ Ok(ParquetWriterOptions {
+ writer_properties: writer_properties__,
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.ParquetWriterOptions",
FIELDS, GeneratedVisitor)
+ }
+}
impl serde::Serialize for PartialTableReference {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
@@ -27144,3 +27250,218 @@ impl<'de> serde::Deserialize<'de> for WindowNode {
deserializer.deserialize_struct("datafusion.WindowNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for WriterProperties {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ use serde::ser::SerializeStruct;
+ let mut len = 0;
+ if self.data_page_size_limit != 0 {
+ len += 1;
+ }
+ if self.dictionary_page_size_limit != 0 {
+ len += 1;
+ }
+ if self.data_page_row_count_limit != 0 {
+ len += 1;
+ }
+ if self.write_batch_size != 0 {
+ len += 1;
+ }
+ if self.max_row_group_size != 0 {
+ len += 1;
+ }
+ if !self.writer_version.is_empty() {
+ len += 1;
+ }
+ if !self.created_by.is_empty() {
+ len += 1;
+ }
+ let mut struct_ser =
serializer.serialize_struct("datafusion.WriterProperties", len)?;
+ if self.data_page_size_limit != 0 {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("dataPageSizeLimit",
ToString::to_string(&self.data_page_size_limit).as_str())?;
+ }
+ if self.dictionary_page_size_limit != 0 {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("dictionaryPageSizeLimit",
ToString::to_string(&self.dictionary_page_size_limit).as_str())?;
+ }
+ if self.data_page_row_count_limit != 0 {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("dataPageRowCountLimit",
ToString::to_string(&self.data_page_row_count_limit).as_str())?;
+ }
+ if self.write_batch_size != 0 {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("writeBatchSize",
ToString::to_string(&self.write_batch_size).as_str())?;
+ }
+ if self.max_row_group_size != 0 {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("maxRowGroupSize",
ToString::to_string(&self.max_row_group_size).as_str())?;
+ }
+ if !self.writer_version.is_empty() {
+ struct_ser.serialize_field("writerVersion", &self.writer_version)?;
+ }
+ if !self.created_by.is_empty() {
+ struct_ser.serialize_field("createdBy", &self.created_by)?;
+ }
+ struct_ser.end()
+ }
+}
+impl<'de> serde::Deserialize<'de> for WriterProperties {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "data_page_size_limit",
+ "dataPageSizeLimit",
+ "dictionary_page_size_limit",
+ "dictionaryPageSizeLimit",
+ "data_page_row_count_limit",
+ "dataPageRowCountLimit",
+ "write_batch_size",
+ "writeBatchSize",
+ "max_row_group_size",
+ "maxRowGroupSize",
+ "writer_version",
+ "writerVersion",
+ "created_by",
+ "createdBy",
+ ];
+
+ #[allow(clippy::enum_variant_names)]
+ enum GeneratedField {
+ DataPageSizeLimit,
+ DictionaryPageSizeLimit,
+ DataPageRowCountLimit,
+ WriteBatchSize,
+ MaxRowGroupSize,
+ WriterVersion,
+ CreatedBy,
+ }
+ impl<'de> serde::Deserialize<'de> for GeneratedField {
+ fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = GeneratedField;
+
+ fn expecting(&self, formatter: &mut
std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ #[allow(unused_variables)]
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<GeneratedField, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "dataPageSizeLimit" | "data_page_size_limit" =>
Ok(GeneratedField::DataPageSizeLimit),
+ "dictionaryPageSizeLimit" |
"dictionary_page_size_limit" => Ok(GeneratedField::DictionaryPageSizeLimit),
+ "dataPageRowCountLimit" |
"data_page_row_count_limit" => Ok(GeneratedField::DataPageRowCountLimit),
+ "writeBatchSize" | "write_batch_size" =>
Ok(GeneratedField::WriteBatchSize),
+ "maxRowGroupSize" | "max_row_group_size" =>
Ok(GeneratedField::MaxRowGroupSize),
+ "writerVersion" | "writer_version" =>
Ok(GeneratedField::WriterVersion),
+ "createdBy" | "created_by" =>
Ok(GeneratedField::CreatedBy),
+ _ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_identifier(GeneratedVisitor)
+ }
+ }
+ struct GeneratedVisitor;
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = WriterProperties;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ formatter.write_str("struct datafusion.WriterProperties")
+ }
+
+ fn visit_map<V>(self, mut map_: V) ->
std::result::Result<WriterProperties, V::Error>
+ where
+ V: serde::de::MapAccess<'de>,
+ {
+ let mut data_page_size_limit__ = None;
+ let mut dictionary_page_size_limit__ = None;
+ let mut data_page_row_count_limit__ = None;
+ let mut write_batch_size__ = None;
+ let mut max_row_group_size__ = None;
+ let mut writer_version__ = None;
+ let mut created_by__ = None;
+ while let Some(k) = map_.next_key()? {
+ match k {
+ GeneratedField::DataPageSizeLimit => {
+ if data_page_size_limit__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("dataPageSizeLimit"));
+ }
+ data_page_size_limit__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::DictionaryPageSizeLimit => {
+ if dictionary_page_size_limit__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("dictionaryPageSizeLimit"));
+ }
+ dictionary_page_size_limit__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::DataPageRowCountLimit => {
+ if data_page_row_count_limit__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("dataPageRowCountLimit"));
+ }
+ data_page_row_count_limit__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::WriteBatchSize => {
+ if write_batch_size__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("writeBatchSize"));
+ }
+ write_batch_size__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::MaxRowGroupSize => {
+ if max_row_group_size__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("maxRowGroupSize"));
+ }
+ max_row_group_size__ =
+
Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
+ ;
+ }
+ GeneratedField::WriterVersion => {
+ if writer_version__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("writerVersion"));
+ }
+ writer_version__ = Some(map_.next_value()?);
+ }
+ GeneratedField::CreatedBy => {
+ if created_by__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("createdBy"));
+ }
+ created_by__ = Some(map_.next_value()?);
+ }
+ }
+ }
+ Ok(WriterProperties {
+ data_page_size_limit:
data_page_size_limit__.unwrap_or_default(),
+ dictionary_page_size_limit:
dictionary_page_size_limit__.unwrap_or_default(),
+ data_page_row_count_limit:
data_page_row_count_limit__.unwrap_or_default(),
+ write_batch_size: write_batch_size__.unwrap_or_default(),
+ max_row_group_size:
max_row_group_size__.unwrap_or_default(),
+ writer_version: writer_version__.unwrap_or_default(),
+ created_by: created_by__.unwrap_or_default(),
+ })
+ }
+ }
+ deserializer.deserialize_struct("datafusion.WriterProperties", FIELDS,
GeneratedVisitor)
+ }
+}
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index e44355859d..459d5a965c 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1642,7 +1642,7 @@ pub struct PartitionColumn {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileTypeWriterOptions {
- #[prost(oneof = "file_type_writer_options::FileType", tags = "1")]
+ #[prost(oneof = "file_type_writer_options::FileType", tags = "1, 2")]
pub file_type: ::core::option::Option<file_type_writer_options::FileType>,
}
/// Nested message and enum types in `FileTypeWriterOptions`.
@@ -1652,6 +1652,8 @@ pub mod file_type_writer_options {
pub enum FileType {
#[prost(message, tag = "1")]
JsonOptions(super::JsonWriterOptions),
+ #[prost(message, tag = "2")]
+ ParquetOptions(super::ParquetWriterOptions),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
@@ -1662,6 +1664,30 @@ pub struct JsonWriterOptions {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct ParquetWriterOptions {
+ #[prost(message, optional, tag = "1")]
+ pub writer_properties: ::core::option::Option<WriterProperties>,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
+pub struct WriterProperties {
+ #[prost(uint64, tag = "1")]
+ pub data_page_size_limit: u64,
+ #[prost(uint64, tag = "2")]
+ pub dictionary_page_size_limit: u64,
+ #[prost(uint64, tag = "3")]
+ pub data_page_row_count_limit: u64,
+ #[prost(uint64, tag = "4")]
+ pub write_batch_size: u64,
+ #[prost(uint64, tag = "5")]
+ pub max_row_group_size: u64,
+ #[prost(string, tag = "6")]
+ pub writer_version: ::prost::alloc::string::String,
+ #[prost(string, tag = "7")]
+ pub created_by: ::prost::alloc::string::String,
+}
+#[allow(clippy::derive_partial_eq_without_eq)]
+#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FileSinkConfig {
#[prost(string, tag = "1")]
pub object_store_url: ::prost::alloc::string::String,
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index e03b3ffa7b..d137a41fa1 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -23,7 +23,8 @@ use std::sync::Arc;
use crate::common::{byte_to_string, proto_error, str_to_byte};
use crate::protobuf::logical_plan_node::LogicalPlanType::CustomScan;
use crate::protobuf::{
- copy_to_node, CustomTableScanNode, LogicalExprNodeCollection, SqlOption,
+ copy_to_node, file_type_writer_options, CustomTableScanNode,
+ LogicalExprNodeCollection, SqlOption,
};
use crate::{
convert_required,
@@ -49,7 +50,7 @@ use datafusion::{
use datafusion_common::{
context, file_options::StatementOptions, internal_err, not_impl_err,
parsers::CompressionTypeVariant, plan_datafusion_err, DataFusionError,
FileType,
- OwnedTableReference, Result,
+ FileTypeWriterOptions, OwnedTableReference, Result,
};
use datafusion_expr::{
dml,
@@ -62,6 +63,8 @@ use datafusion_expr::{
DistinctOn, DropView, Expr, LogicalPlan, LogicalPlanBuilder,
};
+use datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
+use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_expr::dml::CopyOptions;
use prost::bytes::BufMut;
use prost::Message;
@@ -833,19 +836,48 @@ impl AsLogicalPlan for LogicalPlanNode {
let copy_options = match ©.copy_options {
Some(copy_to_node::CopyOptions::SqlOptions(opt)) => {
- let options = opt.option.iter().map(|o|
(o.key.clone(), o.value.clone())).collect();
- CopyOptions::SQLOptions(StatementOptions::from(
- &options,
- ))
+ let options = opt
+ .option
+ .iter()
+ .map(|o| (o.key.clone(), o.value.clone()))
+ .collect();
+
CopyOptions::SQLOptions(StatementOptions::from(&options))
}
- Some(copy_to_node::CopyOptions::WriterOptions(_)) => {
- return Err(proto_error(
- "LogicalPlan serde is not yet implemented for
CopyTo with WriterOptions",
- ))
+ Some(copy_to_node::CopyOptions::WriterOptions(opt)) => {
+ match &opt.file_type {
+ Some(ft) => match ft {
+
file_type_writer_options::FileType::ParquetOptions(
+ writer_options,
+ ) => {
+ let writer_properties =
+ match
&writer_options.writer_properties {
+ Some(serialized_writer_options) =>
{
+ writer_properties_from_proto(
+ serialized_writer_options,
+ )?
+ }
+ _ => WriterProperties::default(),
+ };
+ CopyOptions::WriterOptions(Box::new(
+ FileTypeWriterOptions::Parquet(
+
ParquetWriterOptions::new(writer_properties),
+ ),
+ ))
+ }
+ _ => {
+ return Err(proto_error(
+ "WriterOptions unsupported file_type",
+ ))
+ }
+ },
+ None => {
+ return Err(proto_error(
+ "WriterOptions missing file_type",
+ ))
+ }
+ }
}
- other => return Err(proto_error(format!(
- "LogicalPlan serde is not yet implemented for CopyTo
with CopyOptions {other:?}",
- )))
+ None => return Err(proto_error("CopyTo missing
CopyOptions")),
};
Ok(datafusion_expr::LogicalPlan::Copy(
datafusion_expr::dml::CopyTo {
@@ -1580,22 +1612,48 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec,
)?;
- let copy_options_proto: Option<copy_to_node::CopyOptions> =
match copy_options {
- CopyOptions::SQLOptions(opt) => {
- let options: Vec<SqlOption> =
opt.clone().into_inner().iter().map(|(k, v)| SqlOption {
- key: k.to_string(),
- value: v.to_string(),
- }).collect();
-
Some(copy_to_node::CopyOptions::SqlOptions(protobuf::SqlOptions {
- option: options
- }))
- }
- CopyOptions::WriterOptions(_) => {
- return Err(proto_error(
- "LogicalPlan serde is not yet implemented for
CopyTo with WriterOptions",
- ))
- }
- };
+ let copy_options_proto: Option<copy_to_node::CopyOptions> =
+ match copy_options {
+ CopyOptions::SQLOptions(opt) => {
+ let options: Vec<SqlOption> = opt
+ .clone()
+ .into_inner()
+ .iter()
+ .map(|(k, v)| SqlOption {
+ key: k.to_string(),
+ value: v.to_string(),
+ })
+ .collect();
+ Some(copy_to_node::CopyOptions::SqlOptions(
+ protobuf::SqlOptions { option: options },
+ ))
+ }
+ CopyOptions::WriterOptions(opt) => {
+ match opt.as_ref() {
+ FileTypeWriterOptions::Parquet(parquet_opts)
=> {
+ let parquet_writer_options =
+ protobuf::ParquetWriterOptions {
+ writer_properties: Some(
+ writer_properties_to_proto(
+
&parquet_opts.writer_options,
+ ),
+ ),
+ };
+ let parquet_options =
file_type_writer_options::FileType::ParquetOptions(parquet_writer_options);
+
Some(copy_to_node::CopyOptions::WriterOptions(
+ protobuf::FileTypeWriterOptions {
+ file_type: Some(parquet_options),
+ },
+ ))
+ }
+ _ => {
+ return Err(proto_error(
+ "Unsupported FileTypeWriterOptions in
CopyTo",
+ ))
+ }
+ }
+ }
+ };
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CopyTo(Box::new(
@@ -1615,3 +1673,33 @@ impl AsLogicalPlan for LogicalPlanNode {
}
}
}
+
+pub(crate) fn writer_properties_to_proto(
+ props: &WriterProperties,
+) -> protobuf::WriterProperties {
+ protobuf::WriterProperties {
+ data_page_size_limit: props.data_page_size_limit() as u64,
+ dictionary_page_size_limit: props.dictionary_page_size_limit() as u64,
+ data_page_row_count_limit: props.data_page_row_count_limit() as u64,
+ write_batch_size: props.write_batch_size() as u64,
+ max_row_group_size: props.max_row_group_size() as u64,
+ writer_version: format!("{:?}", props.writer_version()),
+ created_by: props.created_by().to_string(),
+ }
+}
+
+pub(crate) fn writer_properties_from_proto(
+ props: &protobuf::WriterProperties,
+) -> Result<WriterProperties, DataFusionError> {
+ let writer_version = WriterVersion::from_str(&props.writer_version)
+ .map_err(|e| proto_error(e.to_string()))?;
+ Ok(WriterProperties::builder()
+ .set_created_by(props.created_by.clone())
+ .set_writer_version(writer_version)
+ .set_dictionary_page_size_limit(props.dictionary_page_size_limit as
usize)
+ .set_data_page_row_count_limit(props.data_page_row_count_limit as
usize)
+ .set_data_page_size_limit(props.data_page_size_limit as usize)
+ .set_write_batch_size(props.write_batch_size as usize)
+ .set_max_row_group_size(props.max_row_group_size as usize)
+ .build())
+}
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 65f9f139a8..824eb60a57 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -40,6 +40,7 @@ use datafusion::physical_plan::{
functions, ColumnStatistics, Partitioning, PhysicalExpr, Statistics,
WindowExpr,
};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
+use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{
@@ -52,6 +53,7 @@ use crate::logical_plan;
use crate::protobuf;
use crate::protobuf::physical_expr_node::ExprType;
+use crate::logical_plan::writer_properties_from_proto;
use chrono::{TimeZone, Utc};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -769,6 +771,11 @@ impl TryFrom<&protobuf::FileTypeWriterOptions> for
FileTypeWriterOptions {
protobuf::file_type_writer_options::FileType::JsonOptions(opts) =>
Ok(
Self::JSON(JsonWriterOptions::new(opts.compression().into())),
),
+ protobuf::file_type_writer_options::FileType::ParquetOptions(opt)
=> {
+ let props = opt.writer_properties.clone().unwrap_or_default();
+ let writer_properties = writer_properties_from_proto(&props)?;
+ Ok(Self::Parquet(ParquetWriterOptions::new(writer_properties)))
+ }
}
}
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9798b06f47..3eeae01a64 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -31,7 +31,7 @@ use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
-use datafusion::parquet::file::properties::WriterProperties;
+use datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig,
SessionContext};
use datafusion::test_util::{TestTableFactory, TestTableProvider};
@@ -330,7 +330,6 @@ async fn roundtrip_logical_plan_copy_to_sql_options() ->
Result<()> {
}
#[tokio::test]
-#[ignore] // see https://github.com/apache/arrow-datafusion/issues/8619
async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
let ctx = SessionContext::new();
@@ -339,11 +338,17 @@ async fn roundtrip_logical_plan_copy_to_writer_options()
-> Result<()> {
let writer_properties = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.set_created_by("DataFusion Test".to_string())
+ .set_writer_version(WriterVersion::PARQUET_2_0)
+ .set_write_batch_size(111)
+ .set_data_page_size_limit(222)
+ .set_data_page_row_count_limit(333)
+ .set_dictionary_page_size_limit(444)
+ .set_max_row_group_size(555)
.build();
let plan = LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
- output_url: "test.csv".to_string(),
- file_format: FileType::CSV,
+ output_url: "test.parquet".to_string(),
+ file_format: FileType::PARQUET,
single_file_output: true,
copy_options: CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
@@ -354,6 +359,34 @@ async fn roundtrip_logical_plan_copy_to_writer_options()
-> Result<()> {
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}"));
+ match logical_round_trip {
+ LogicalPlan::Copy(copy_to) => {
+ assert_eq!("test.parquet", copy_to.output_url);
+ assert_eq!(FileType::PARQUET, copy_to.file_format);
+ assert!(copy_to.single_file_output);
+ match ©_to.copy_options {
+ CopyOptions::WriterOptions(y) => match y.as_ref() {
+ FileTypeWriterOptions::Parquet(p) => {
+ let props = &p.writer_options;
+ assert_eq!("DataFusion Test", props.created_by());
+ assert_eq!(
+ "PARQUET_2_0",
+ format!("{:?}", props.writer_version())
+ );
+ assert_eq!(111, props.write_batch_size());
+ assert_eq!(222, props.data_page_size_limit());
+ assert_eq!(333, props.data_page_row_count_limit());
+ assert_eq!(444, props.dictionary_page_size_limit());
+ assert_eq!(555, props.max_row_group_size());
+ }
+ _ => panic!(),
+ },
+ _ => panic!(),
+ }
+ }
+ _ => panic!(),
+ }
+
Ok(())
}