This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 14cd71eaba feat: add compression level configuration for JSON/CSV
writers (#18954)
14cd71eaba is described below
commit 14cd71eaba76ea43aa0adb2d88a86b36495ee0cc
Author: Smotrov Oleksii <[email protected]>
AuthorDate: Thu Dec 18 00:51:33 2025 +0200
feat: add compression level configuration for JSON/CSV writers (#18954)
## Which issue does this PR close?
Closes #18947
## Rationale for this change
Currently, DataFusion uses default compression levels when writing
compressed JSON and CSV files. For ZSTD, this means level 3, which
prioritizes speed over compression ratio. Users working with large
datasets who want to optimize for storage costs or network transfer have
no way to increase the compression level.
This is particularly important for cloud data lake scenarios where
storage and egress costs can be significant.
## What changes are included in this PR?
- Add `compression_level: Option<u32>` field to `JsonOptions` and
`CsvOptions` in `config.rs`
- Add `convert_async_writer_with_level()` method to
`FileCompressionType` (non-breaking API extension)
- Keep original `convert_async_writer()` as a convenience wrapper for
backward compatibility
- Update `JsonWriterOptions` and `CsvWriterOptions` with
`compression_level` field
- Update `ObjectWriterBuilder` to support compression level
- Update JSON and CSV sinks to pass compression level through the write
pipeline
- Update proto definitions and conversions for serialization support
- Fix unrelated unused import warning in `udf.rs` (conditional
compilation for debug-only imports)
## Are these changes tested?
The changes follow the existing patterns used throughout the codebase.
The implementation was verified by:
- Building successfully with `cargo build`
- Running existing tests with `cargo test --package datafusion-proto`
- All 131 proto integration tests pass
## Are there any user-facing changes?
Yes, users can now specify compression level when writing JSON/CSV
files:
```rust
use datafusion::common::config::JsonOptions;
use datafusion::common::parsers::CompressionTypeVariant;
let json_opts = JsonOptions {
compression: CompressionTypeVariant::ZSTD,
compression_level: Some(9), // Higher compression
..Default::default()
};
```
**Supported compression levels:**
- ZSTD: 1-22 (default: 3)
- GZIP: 0-9 (default: 6)
- BZIP2: 1-9 (default: 9)
- XZ: 0-9 (default: 6)
**This is a non-breaking change** - the original
`convert_async_writer()` method signature is preserved for backward
compatibility.
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/common/src/config.rs | 25 +++++++++++
datafusion/common/src/file_options/csv_writer.rs | 17 ++++++++
datafusion/common/src/file_options/json_writer.rs | 18 +++++++-
datafusion/datasource-csv/src/file_format.rs | 1 +
datafusion/datasource-json/src/file_format.rs | 1 +
datafusion/datasource/src/file_compression_type.rs | 48 +++++++++++++++++++---
datafusion/datasource/src/write/mod.rs | 23 ++++++++++-
datafusion/datasource/src/write/orchestration.rs | 2 +
.../proto-common/proto/datafusion_common.proto | 2 +
datafusion/proto-common/src/from_proto/mod.rs | 2 +
datafusion/proto-common/src/generated/pbjson.rs | 40 ++++++++++++++++++
datafusion/proto-common/src/generated/prost.rs | 6 +++
datafusion/proto-common/src/to_proto/mod.rs | 2 +
.../proto/src/generated/datafusion_proto_common.rs | 6 +++
datafusion/proto/src/logical_plan/file_formats.rs | 4 ++
.../proto/tests/cases/roundtrip_logical_plan.rs | 6 ++-
16 files changed, 195 insertions(+), 8 deletions(-)
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index c8ed491ef4..949fd83512 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -1725,6 +1725,7 @@ config_field!(bool, value =>
default_config_transform(value.to_lowercase().as_st
config_field!(usize);
config_field!(f64);
config_field!(u64);
+config_field!(u32);
impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str)
{
@@ -2844,6 +2845,14 @@ config_namespace! {
/// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default =
CompressionTypeVariant::UNCOMPRESSED
+ /// Compression level for the output file. The valid range depends on
the
+ /// compression algorithm:
+ /// - ZSTD: 1 to 22 (default: 3)
+ /// - GZIP: 0 to 9 (default: 6)
+ /// - BZIP2: 0 to 9 (default: 6)
+ /// - XZ: 0 to 9 (default: 6)
+ /// If not specified, the default level for the compression algorithm
is used.
+ pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
@@ -2966,6 +2975,14 @@ impl CsvOptions {
self
}
+ /// Set the compression level for the output file.
+ /// The valid range depends on the compression algorithm.
+ /// If not specified, the default level for the algorithm is used.
+ pub fn with_compression_level(mut self, level: u32) -> Self {
+ self.compression_level = Some(level);
+ self
+ }
+
/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.delimiter
@@ -2991,6 +3008,14 @@ config_namespace! {
/// Options controlling JSON format
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default =
CompressionTypeVariant::UNCOMPRESSED
+ /// Compression level for the output file. The valid range depends on
the
+ /// compression algorithm:
+ /// - ZSTD: 1 to 22 (default: 3)
+ /// - GZIP: 0 to 9 (default: 6)
+ /// - BZIP2: 0 to 9 (default: 6)
+ /// - XZ: 0 to 9 (default: 6)
+ /// If not specified, the default level for the compression algorithm
is used.
+ pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
}
}
diff --git a/datafusion/common/src/file_options/csv_writer.rs
b/datafusion/common/src/file_options/csv_writer.rs
index 943288af91..4e6f74a444 100644
--- a/datafusion/common/src/file_options/csv_writer.rs
+++ b/datafusion/common/src/file_options/csv_writer.rs
@@ -31,6 +31,8 @@ pub struct CsvWriterOptions {
/// Compression to apply after ArrowWriter serializes RecordBatches.
/// This compression is applied by DataFusion not the ArrowWriter itself.
pub compression: CompressionTypeVariant,
+ /// Compression level for the output file.
+ pub compression_level: Option<u32>,
}
impl CsvWriterOptions {
@@ -41,6 +43,20 @@ impl CsvWriterOptions {
Self {
writer_options,
compression,
+ compression_level: None,
+ }
+ }
+
+ /// Create a new `CsvWriterOptions` with the specified compression level.
+ pub fn new_with_level(
+ writer_options: WriterBuilder,
+ compression: CompressionTypeVariant,
+ compression_level: u32,
+ ) -> Self {
+ Self {
+ writer_options,
+ compression,
+ compression_level: Some(compression_level),
}
}
}
@@ -81,6 +97,7 @@ impl TryFrom<&CsvOptions> for CsvWriterOptions {
Ok(CsvWriterOptions {
writer_options: builder,
compression: value.compression,
+ compression_level: value.compression_level,
})
}
}
diff --git a/datafusion/common/src/file_options/json_writer.rs
b/datafusion/common/src/file_options/json_writer.rs
index 750d297232..a537192c81 100644
--- a/datafusion/common/src/file_options/json_writer.rs
+++ b/datafusion/common/src/file_options/json_writer.rs
@@ -27,11 +27,26 @@ use crate::{
#[derive(Clone, Debug)]
pub struct JsonWriterOptions {
pub compression: CompressionTypeVariant,
+ pub compression_level: Option<u32>,
}
impl JsonWriterOptions {
pub fn new(compression: CompressionTypeVariant) -> Self {
- Self { compression }
+ Self {
+ compression,
+ compression_level: None,
+ }
+ }
+
+ /// Create a new `JsonWriterOptions` with the specified compression and
level.
+ pub fn new_with_level(
+ compression: CompressionTypeVariant,
+ compression_level: u32,
+ ) -> Self {
+ Self {
+ compression,
+ compression_level: Some(compression_level),
+ }
}
}
@@ -41,6 +56,7 @@ impl TryFrom<&JsonOptions> for JsonWriterOptions {
fn try_from(value: &JsonOptions) -> Result<Self> {
Ok(JsonWriterOptions {
compression: value.compression,
+ compression_level: value.compression_level,
})
}
}
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index e81ae63011..7729f13901 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -780,6 +780,7 @@ impl FileSink for CsvSink {
context,
serializer,
self.writer_options.compression.into(),
+ self.writer_options.compression_level,
object_store,
demux_task,
file_stream_rx,
diff --git a/datafusion/datasource-json/src/file_format.rs
b/datafusion/datasource-json/src/file_format.rs
index 0f8754ce72..a14458b5ac 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -373,6 +373,7 @@ impl FileSink for JsonSink {
context,
serializer,
self.writer_options.compression.into(),
+ self.writer_options.compression_level,
object_store,
demux_task,
file_stream_rx,
diff --git a/datafusion/datasource/src/file_compression_type.rs
b/datafusion/datasource/src/file_compression_type.rs
index ebc0e7821f..89efb58065 100644
--- a/datafusion/datasource/src/file_compression_type.rs
+++ b/datafusion/datasource/src/file_compression_type.rs
@@ -155,22 +155,60 @@ impl FileCompressionType {
}
/// Wrap the given `BufWriter` so that it performs compressed writes
- /// according to this `FileCompressionType`.
+ /// according to this `FileCompressionType` using the default compression
level.
pub fn convert_async_writer(
&self,
w: BufWriter,
) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
+ self.convert_async_writer_with_level(w, None)
+ }
+
+ /// Wrap the given `BufWriter` so that it performs compressed writes
+ /// according to this `FileCompressionType`.
+ ///
+ /// If `compression_level` is `Some`, the encoder will use the specified
+ /// compression level. If `None`, the default level for each algorithm is
used.
+ pub fn convert_async_writer_with_level(
+ &self,
+ w: BufWriter,
+ compression_level: Option<u32>,
+ ) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
+ #[cfg(feature = "compression")]
+ use async_compression::Level;
+
Ok(match self.variant {
#[cfg(feature = "compression")]
- GZIP => Box::new(GzipEncoder::new(w)),
+ GZIP => match compression_level {
+ Some(level) => {
+ Box::new(GzipEncoder::with_quality(w, Level::Precise(level
as i32)))
+ }
+ None => Box::new(GzipEncoder::new(w)),
+ },
#[cfg(feature = "compression")]
- BZIP2 => Box::new(BzEncoder::new(w)),
+ BZIP2 => match compression_level {
+ Some(level) => {
+ Box::new(BzEncoder::with_quality(w, Level::Precise(level
as i32)))
+ }
+ None => Box::new(BzEncoder::new(w)),
+ },
#[cfg(feature = "compression")]
- XZ => Box::new(XzEncoder::new(w)),
+ XZ => match compression_level {
+ Some(level) => {
+ Box::new(XzEncoder::with_quality(w, Level::Precise(level
as i32)))
+ }
+ None => Box::new(XzEncoder::new(w)),
+ },
#[cfg(feature = "compression")]
- ZSTD => Box::new(ZstdEncoder::new(w)),
+ ZSTD => match compression_level {
+ Some(level) => {
+ Box::new(ZstdEncoder::with_quality(w, Level::Precise(level
as i32)))
+ }
+ None => Box::new(ZstdEncoder::new(w)),
+ },
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
+ // compression_level is not used when compression feature is
disabled
+ let _ = compression_level;
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
));
diff --git a/datafusion/datasource/src/write/mod.rs
b/datafusion/datasource/src/write/mod.rs
index ca65477c49..e8d2d17da8 100644
--- a/datafusion/datasource/src/write/mod.rs
+++ b/datafusion/datasource/src/write/mod.rs
@@ -131,6 +131,8 @@ pub struct ObjectWriterBuilder {
object_store: Arc<dyn ObjectStore>,
/// The size of the buffer for the object writer.
buffer_size: Option<usize>,
+ /// The compression level for the object writer.
+ compression_level: Option<u32>,
}
impl ObjectWriterBuilder {
@@ -145,6 +147,7 @@ impl ObjectWriterBuilder {
location: location.clone(),
object_store,
buffer_size: None,
+ compression_level: None,
}
}
@@ -202,6 +205,22 @@ impl ObjectWriterBuilder {
self.buffer_size
}
+ /// Set compression level for object writer.
+ pub fn set_compression_level(&mut self, compression_level: Option<u32>) {
+ self.compression_level = compression_level;
+ }
+
+ /// Set compression level for object writer, returning the builder.
+ pub fn with_compression_level(mut self, compression_level: Option<u32>) ->
Self {
+ self.compression_level = compression_level;
+ self
+ }
+
+ /// Currently specified compression level.
+ pub fn get_compression_level(&self) -> Option<u32> {
+ self.compression_level
+ }
+
/// Return a writer object that writes to the object store location.
///
/// If a buffer size has not been set, the default buffer buffer size will
@@ -215,6 +234,7 @@ impl ObjectWriterBuilder {
location,
object_store,
buffer_size,
+ compression_level,
} = self;
let buf_writer = match buffer_size {
@@ -222,6 +242,7 @@ impl ObjectWriterBuilder {
None => BufWriter::new(object_store, location),
};
- file_compression_type.convert_async_writer(buf_writer)
+ file_compression_type
+ .convert_async_writer_with_level(buf_writer, compression_level)
}
}
diff --git a/datafusion/datasource/src/write/orchestration.rs
b/datafusion/datasource/src/write/orchestration.rs
index 1672817de0..39c91a1c0d 100644
--- a/datafusion/datasource/src/write/orchestration.rs
+++ b/datafusion/datasource/src/write/orchestration.rs
@@ -248,6 +248,7 @@ pub async fn spawn_writer_tasks_and_join(
context: &Arc<TaskContext>,
serializer: Arc<dyn BatchSerializer>,
compression: FileCompressionType,
+ compression_level: Option<u32>,
object_store: Arc<dyn ObjectStore>,
demux_task: SpawnedTask<Result<()>>,
mut file_stream_rx: DemuxedStreamReceiver,
@@ -273,6 +274,7 @@ pub async fn spawn_writer_tasks_and_join(
.execution
.objectstore_writer_buffer_size,
))
+ .with_compression_level(compression_level)
.build()?;
if tx_file_bundle
diff --git a/datafusion/proto-common/proto/datafusion_common.proto
b/datafusion/proto-common/proto/datafusion_common.proto
index 9d7d33dc01..08bb25bd71 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -461,12 +461,14 @@ message CsvOptions {
bytes newlines_in_values = 16; // Indicates if newlines are supported in
values
bytes terminator = 17; // Optional terminator character as a byte
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
+ optional uint32 compression_level = 19; // Optional compression level
}
// Options controlling CSV format
message JsonOptions {
CompressionTypeVariant compression = 1; // Compression type
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema
inference
+ optional uint32 compression_level = 3; // Optional compression level
}
message TableParquetOptions {
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index c591a202af..7cb7a92031 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -908,6 +908,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
double_quote: proto_opts.double_quote.first().map(|h| *h != 0),
newlines_in_values: proto_opts.newlines_in_values.first().map(|h|
*h != 0),
compression: proto_opts.compression().into(),
+ compression_level: proto_opts.compression_level,
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as
usize),
date_format: (!proto_opts.date_format.is_empty())
.then(|| proto_opts.date_format.clone()),
@@ -1095,6 +1096,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
let compression: protobuf::CompressionTypeVariant =
proto_opts.compression();
Ok(JsonOptions {
compression: compression.into(),
+ compression_level: proto_opts.compression_level,
schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as
usize),
})
}
diff --git a/datafusion/proto-common/src/generated/pbjson.rs
b/datafusion/proto-common/src/generated/pbjson.rs
index 0bf87203ac..d38cf86825 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1684,6 +1684,9 @@ impl serde::Serialize for CsvOptions {
if !self.truncated_rows.is_empty() {
len += 1;
}
+ if self.compression_level.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion_common.CsvOptions", len)?;
if !self.has_header.is_empty() {
#[allow(clippy::needless_borrow)]
@@ -1761,6 +1764,9 @@ impl serde::Serialize for CsvOptions {
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("truncatedRows",
pbjson::private::base64::encode(&self.truncated_rows).as_str())?;
}
+ if let Some(v) = self.compression_level.as_ref() {
+ struct_ser.serialize_field("compressionLevel", v)?;
+ }
struct_ser.end()
}
}
@@ -1801,6 +1807,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"terminator",
"truncated_rows",
"truncatedRows",
+ "compression_level",
+ "compressionLevel",
];
#[allow(clippy::enum_variant_names)]
@@ -1823,6 +1831,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
NewlinesInValues,
Terminator,
TruncatedRows,
+ CompressionLevel,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -1862,6 +1871,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"newlinesInValues" | "newlines_in_values" =>
Ok(GeneratedField::NewlinesInValues),
"terminator" => Ok(GeneratedField::Terminator),
"truncatedRows" | "truncated_rows" =>
Ok(GeneratedField::TruncatedRows),
+ "compressionLevel" | "compression_level" =>
Ok(GeneratedField::CompressionLevel),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -1899,6 +1909,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
let mut newlines_in_values__ = None;
let mut terminator__ = None;
let mut truncated_rows__ = None;
+ let mut compression_level__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::HasHeader => {
@@ -2029,6 +2040,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
;
}
+ GeneratedField::CompressionLevel => {
+ if compression_level__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("compressionLevel"));
+ }
+ compression_level__ =
+
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
x.0)
+ ;
+ }
}
}
Ok(CsvOptions {
@@ -2050,6 +2069,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
newlines_in_values:
newlines_in_values__.unwrap_or_default(),
terminator: terminator__.unwrap_or_default(),
truncated_rows: truncated_rows__.unwrap_or_default(),
+ compression_level: compression_level__,
})
}
}
@@ -4566,6 +4586,9 @@ impl serde::Serialize for JsonOptions {
if self.schema_infer_max_rec.is_some() {
len += 1;
}
+ if self.compression_level.is_some() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion_common.JsonOptions", len)?;
if self.compression != 0 {
let v = CompressionTypeVariant::try_from(self.compression)
@@ -4577,6 +4600,9 @@ impl serde::Serialize for JsonOptions {
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("schemaInferMaxRec",
ToString::to_string(&v).as_str())?;
}
+ if let Some(v) = self.compression_level.as_ref() {
+ struct_ser.serialize_field("compressionLevel", v)?;
+ }
struct_ser.end()
}
}
@@ -4590,12 +4616,15 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
"compression",
"schema_infer_max_rec",
"schemaInferMaxRec",
+ "compression_level",
+ "compressionLevel",
];
#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Compression,
SchemaInferMaxRec,
+ CompressionLevel,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -4619,6 +4648,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
match value {
"compression" => Ok(GeneratedField::Compression),
"schemaInferMaxRec" | "schema_infer_max_rec" =>
Ok(GeneratedField::SchemaInferMaxRec),
+ "compressionLevel" | "compression_level" =>
Ok(GeneratedField::CompressionLevel),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -4640,6 +4670,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
{
let mut compression__ = None;
let mut schema_infer_max_rec__ = None;
+ let mut compression_level__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Compression => {
@@ -4656,11 +4687,20 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
x.0)
;
}
+ GeneratedField::CompressionLevel => {
+ if compression_level__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("compressionLevel"));
+ }
+ compression_level__ =
+
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
x.0)
+ ;
+ }
}
}
Ok(JsonOptions {
compression: compression__.unwrap_or_default(),
schema_infer_max_rec: schema_infer_max_rec__,
+ compression_level: compression_level__,
})
}
}
diff --git a/datafusion/proto-common/src/generated/prost.rs
b/datafusion/proto-common/src/generated/prost.rs
index 9c4b7e1252..16601dcf46 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -649,6 +649,9 @@ pub struct CsvOptions {
/// Indicates if truncated rows are allowed
#[prost(bytes = "vec", tag = "18")]
pub truncated_rows: ::prost::alloc::vec::Vec<u8>,
+ /// Optional compression level
+ #[prost(uint32, optional, tag = "19")]
+ pub compression_level: ::core::option::Option<u32>,
}
/// Options controlling CSV format
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
@@ -659,6 +662,9 @@ pub struct JsonOptions {
/// Optional max records for schema inference
#[prost(uint64, optional, tag = "2")]
pub schema_infer_max_rec: ::core::option::Option<u64>,
+ /// Optional compression level
+ #[prost(uint32, optional, tag = "3")]
+ pub compression_level: ::core::option::Option<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TableParquetOptions {
diff --git a/datafusion/proto-common/src/to_proto/mod.rs
b/datafusion/proto-common/src/to_proto/mod.rs
index ca1057da4e..dfa136717f 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -976,6 +976,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
null_regex: opts.null_regex.clone().unwrap_or_default(),
comment: opts.comment.map_or_else(Vec::new, |h| vec![h]),
truncated_rows: opts.truncated_rows.map_or_else(Vec::new, |h|
vec![h as u8]),
+ compression_level: opts.compression_level,
})
}
}
@@ -988,6 +989,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
Ok(protobuf::JsonOptions {
compression: compression.into(),
schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
+ compression_level: opts.compression_level,
})
}
}
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 9c4b7e1252..16601dcf46 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -649,6 +649,9 @@ pub struct CsvOptions {
/// Indicates if truncated rows are allowed
#[prost(bytes = "vec", tag = "18")]
pub truncated_rows: ::prost::alloc::vec::Vec<u8>,
+ /// Optional compression level
+ #[prost(uint32, optional, tag = "19")]
+ pub compression_level: ::core::option::Option<u32>,
}
/// Options controlling CSV format
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
@@ -659,6 +662,9 @@ pub struct JsonOptions {
/// Optional max records for schema inference
#[prost(uint64, optional, tag = "2")]
pub schema_infer_max_rec: ::core::option::Option<u64>,
+ /// Optional compression level
+ #[prost(uint32, optional, tag = "3")]
+ pub compression_level: ::core::option::Option<u32>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TableParquetOptions {
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs
b/datafusion/proto/src/logical_plan/file_formats.rs
index 508edd3eff..87ce4d524f 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -62,6 +62,7 @@ impl CsvOptionsProto {
.newlines_in_values
.map_or(vec![], |v| vec![v as u8]),
truncated_rows: options.truncated_rows.map_or(vec![], |v|
vec![v as u8]),
+ compression_level: options.compression_level,
}
} else {
CsvOptionsProto::default()
@@ -152,6 +153,7 @@ impl From<&CsvOptionsProto> for CsvOptions {
} else {
Some(proto.truncated_rows[0] != 0)
},
+ compression_level: proto.compression_level,
}
}
}
@@ -238,6 +240,7 @@ impl JsonOptionsProto {
JsonOptionsProto {
compression: options.compression as i32,
schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v
as u64),
+ compression_level: options.compression_level,
}
} else {
JsonOptionsProto::default()
@@ -256,6 +259,7 @@ impl From<&JsonOptionsProto> for JsonOptions {
_ => CompressionTypeVariant::UNCOMPRESSED,
},
schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as
usize),
+ compression_level: proto.compression_level,
}
}
}
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index f1be8e207f..77676fc2fd 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -549,6 +549,8 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()>
{
csv_format.timestamp_format = Some("HH:mm:ss.SSSSSS".to_string());
csv_format.time_format = Some("HH:mm:ss".to_string());
csv_format.null_value = Some("NIL".to_string());
+ csv_format.compression = CompressionTypeVariant::GZIP;
+ csv_format.compression_level = Some(6);
let file_type =
format_as_file_type(Arc::new(CsvFormatFactory::new_with_options(
csv_format.clone(),
@@ -593,7 +595,9 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()>
{
assert_eq!(csv_format.datetime_format, csv_config.datetime_format);
assert_eq!(csv_format.timestamp_format,
csv_config.timestamp_format);
assert_eq!(csv_format.time_format, csv_config.time_format);
- assert_eq!(csv_format.null_value, csv_config.null_value)
+ assert_eq!(csv_format.null_value, csv_config.null_value);
+ assert_eq!(csv_format.compression, csv_config.compression);
+ assert_eq!(csv_format.compression_level,
csv_config.compression_level);
}
_ => panic!(),
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]