This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 2587df09c3 Support `newlines_in_values` CSV option (#11533)
2587df09c3 is described below
commit 2587df09c3fd9659f5076cedf98046e258764b2e
Author: Chris Connelly <[email protected]>
AuthorDate: Mon Jul 22 00:42:03 2024 +0100
Support `newlines_in_values` CSV option (#11533)
* feat!: support `newlines_in_values` CSV option
This significantly simplifies the UX when dealing with large CSV files
that must support newlines in (quoted) values. By default, large CSV
files will be repartitioned into multiple parallel range scans. This is
great for performance in the common case but when large CSVs contain
newlines in values the parallel scan will fail due to splitting on
newlines within quotes rather than actual line terminators.
With the current implementation, this behaviour can be controlled by the
session-level `datafusion.optimizer.repartition_file_scans` and
`datafusion.optimizer.repartition_file_min_size` settings.
This commit introduces a `newlines_in_values` option to `CsvOptions` and
plumbs it through to `CsvExec`, which includes it in the test for whether
parallel execution is supported. This provides a convenient and
searchable way to disable file scan repartitioning on a per-CSV basis.
BREAKING CHANGE: This adds new public fields to types with all public
fields, which is a breaking change.
* docs: normalise `newlines_in_values` documentation
* test: add/fix sqllogictests for `newlines_in_values`
* docs: document `datafusion.catalog.newlines_in_values`
* fix: typo in config.md
* chore: suppress lint on too many arguments for `CsvExec::new`
* fix: always checkout `*.slt` with LF line endings
This is a bit of a stab in the dark, but it might fix multiline tests on
Windows.
* fix: always checkout `newlines_in_values.csv` with `LF` line endings
The default git behaviour of converting line endings for checked out files
causes the `csv_files.slt` test to fail when testing `newlines_in_values`. This
appears to be due to the quoted newlines being converted to CRLF, which are not
then normalised when the CSV is read. Assuming that the sqllogictests do
normalise line endings in the expected output, this could then lead to a
"spurious" diff from the actual output.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.gitattributes | 1 +
datafusion/common/src/config.rs | 30 +++++++++++++
datafusion/core/src/datasource/file_format/csv.rs | 50 ++++++++++++++++++++++
.../core/src/datasource/file_format/options.rs | 22 ++++++++++
.../core/src/datasource/physical_plan/csv.rs | 27 ++++++++++--
.../src/physical_optimizer/enforce_distribution.rs | 3 ++
.../src/physical_optimizer/projection_pushdown.rs | 3 ++
.../replace_with_order_preserving_variants.rs | 1 +
datafusion/core/src/test/mod.rs | 3 ++
datafusion/core/tests/data/newlines_in_values.csv | 13 ++++++
.../proto-common/proto/datafusion_common.proto | 1 +
datafusion/proto-common/src/from_proto/mod.rs | 1 +
datafusion/proto-common/src/generated/pbjson.rs | 21 +++++++++
datafusion/proto-common/src/generated/prost.rs | 3 ++
datafusion/proto-common/src/to_proto/mod.rs | 3 ++
datafusion/proto/proto/datafusion.proto | 1 +
.../proto/src/generated/datafusion_proto_common.rs | 3 ++
datafusion/proto/src/generated/pbjson.rs | 18 ++++++++
datafusion/proto/src/generated/prost.rs | 2 +
datafusion/proto/src/physical_plan/mod.rs | 2 +
datafusion/sqllogictest/test_files/csv_files.slt | 42 ++++++++++++++++++
.../sqllogictest/test_files/information_schema.slt | 2 +
docs/source/user-guide/configs.md | 1 +
23 files changed, 250 insertions(+), 3 deletions(-)
diff --git a/.gitattributes b/.gitattributes
index bcdeffc09a..84b47a6fc5 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1,3 +1,4 @@
.github/ export-ignore
+datafusion/core/tests/data/newlines_in_values.csv text eol=lf
datafusion/proto/src/generated/prost.rs linguist-generated
datafusion/proto/src/generated/pbjson.rs linguist-generated
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index b46b002baa..3cbe14cb55 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -184,6 +184,16 @@ config_namespace! {
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = false
+
+ /// Specifies whether newlines in (quoted) CSV values are supported.
+ ///
+ /// This is the default value for `format.newlines_in_values` for
`CREATE EXTERNAL TABLE`
+ /// if not specified explicitly in the statement.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that
newlines in values are
+ /// parsed successfully, which may reduce performance.
+ pub newlines_in_values: bool, default = false
}
}
@@ -1593,6 +1603,14 @@ config_namespace! {
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
+ /// Specifies whether newlines in (quoted) values are supported.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that
newlines in values are
+ /// parsed successfully, which may reduce performance.
+ ///
+ /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
+ pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default =
CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
@@ -1665,6 +1683,18 @@ impl CsvOptions {
self
}
+ /// Specifies whether newlines in (quoted) values are supported.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
+ /// parsed successfully, which may reduce performance.
+ ///
+ /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
+ pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self
{
+ self.newlines_in_values = Some(newlines_in_values);
+ self
+ }
+
/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 5daa844755..185f50883b 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -233,6 +233,18 @@ impl CsvFormat {
self
}
+ /// Specifies whether newlines in (quoted) values are supported.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
+ /// parsed successfully, which may reduce performance.
+ ///
+ /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
+ pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self
{
+ self.options.newlines_in_values = Some(newlines_in_values);
+ self
+ }
+
/// Set a `FileCompressionType` of CSV
/// - defaults to `FileCompressionType::UNCOMPRESSED`
pub fn with_file_compression_type(
@@ -330,6 +342,9 @@ impl FileFormat for CsvFormat {
self.options.quote,
self.options.escape,
self.options.comment,
+ self.options
+ .newlines_in_values
+ .unwrap_or(state.config_options().catalog.newlines_in_values),
self.options.compression.into(),
);
Ok(Arc::new(exec))
@@ -1052,6 +1067,41 @@ mod tests {
Ok(())
}
+ #[rstest(n_partitions, case(1), case(2), case(3), case(4))]
+ #[tokio::test]
+ async fn test_csv_parallel_newlines_in_values(n_partitions: usize) ->
Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(n_partitions);
+ let csv_options = CsvReadOptions::default()
+ .has_header(true)
+ .newlines_in_values(true);
+ let ctx = SessionContext::new_with_config(config);
+ let testdata = arrow_test_data();
+ ctx.register_csv(
+ "aggr",
+ &format!("{testdata}/csv/aggregate_test_100.csv"),
+ csv_options,
+ )
+ .await?;
+
+ let query = "select sum(c3) from aggr;";
+ let query_result = ctx.sql(query).await?.collect().await?;
+ let actual_partitions = count_query_csv_partitions(&ctx, query).await?;
+
+ #[rustfmt::skip]
+ let expected = ["+--------------+",
+ "| sum(aggr.c3) |",
+ "+--------------+",
+ "| 781 |",
+ "+--------------+"];
+ assert_batches_eq!(expected, &query_result);
+ assert_eq!(1, actual_partitions); // csv won't be scanned in parallel
when newlines_in_values is set
+
+ Ok(())
+ }
+
/// Read a single empty csv file in parallel
///
/// empty_0_byte.csv:
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index c6d143ed67..552977baba 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -63,6 +63,14 @@ pub struct CsvReadOptions<'a> {
pub escape: Option<u8>,
/// If enabled, lines beginning with this byte are ignored.
pub comment: Option<u8>,
+ /// Specifies whether newlines in (quoted) values are supported.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
+ /// parsed successfully, which may reduce performance.
+ ///
+ /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
+ pub newlines_in_values: bool,
/// An optional schema representing the CSV files. If None, CSV reader
will try to infer it
/// based on data in file.
pub schema: Option<&'a Schema>,
@@ -95,6 +103,7 @@ impl<'a> CsvReadOptions<'a> {
delimiter: b',',
quote: b'"',
escape: None,
+ newlines_in_values: false,
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
@@ -133,6 +142,18 @@ impl<'a> CsvReadOptions<'a> {
self
}
+ /// Specifies whether newlines in (quoted) values are supported.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
+ /// parsed successfully, which may reduce performance.
+ ///
+ /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
+ pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
+ self.newlines_in_values = newlines_in_values;
+ self
+ }
+
/// Specify the file extension for CSV file selection
pub fn file_extension(mut self, file_extension: &'a str) -> Self {
self.file_extension = file_extension;
@@ -490,6 +511,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_delimiter(self.delimiter)
.with_quote(self.quote)
.with_escape(self.escape)
+ .with_newlines_in_values(self.newlines_in_values)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 327fbd976e..fb0e23c6c1 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -59,6 +59,7 @@ pub struct CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
+ newlines_in_values: bool,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Compression type of the file associated with CsvExec
@@ -68,6 +69,7 @@ pub struct CsvExec {
impl CsvExec {
/// Create a new CSV reader execution plan provided base and specific
configurations
+ #[allow(clippy::too_many_arguments)]
pub fn new(
base_config: FileScanConfig,
has_header: bool,
@@ -75,6 +77,7 @@ impl CsvExec {
quote: u8,
escape: Option<u8>,
comment: Option<u8>,
+ newlines_in_values: bool,
file_compression_type: FileCompressionType,
) -> Self {
let (projected_schema, projected_statistics,
projected_output_ordering) =
@@ -91,6 +94,7 @@ impl CsvExec {
delimiter,
quote,
escape,
+ newlines_in_values,
metrics: ExecutionPlanMetricsSet::new(),
file_compression_type,
cache,
@@ -126,6 +130,17 @@ impl CsvExec {
self.escape
}
+ /// Specifies whether newlines in (quoted) values are supported.
+ ///
+ /// Parsing newlines in quoted values may be affected by execution
behaviour such as
+ /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
+ /// parsed successfully, which may reduce performance.
+ ///
+ /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
+ pub fn newlines_in_values(&self) -> bool {
+ self.newlines_in_values
+ }
+
fn output_partitioning_helper(file_scan_config: &FileScanConfig) ->
Partitioning {
Partitioning::UnknownPartitioning(file_scan_config.file_groups.len())
}
@@ -196,15 +211,15 @@ impl ExecutionPlan for CsvExec {
/// Redistribute files across partitions according to their size
/// See comments on [`FileGroupPartitioner`] for more detail.
///
- /// Return `None` if can't get repartitioned(empty/compressed file).
+ /// Return `None` if can't get repartitioned (empty, compressed file, or
`newlines_in_values` set).
fn repartitioned(
&self,
target_partitions: usize,
config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let repartition_file_min_size =
config.optimizer.repartition_file_min_size;
- // Parallel execution on compressed CSV file is not supported yet.
- if self.file_compression_type.is_compressed() {
+ // Parallel execution on compressed CSV files or files that must
support newlines in values is not supported yet.
+ if self.file_compression_type.is_compressed() ||
self.newlines_in_values {
return Ok(None);
}
@@ -589,6 +604,7 @@ mod tests {
b'"',
None,
None,
+ false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -658,6 +674,7 @@ mod tests {
b'"',
None,
None,
+ false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -727,6 +744,7 @@ mod tests {
b'"',
None,
None,
+ false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -793,6 +811,7 @@ mod tests {
b'"',
None,
None,
+ false,
file_compression_type.to_owned(),
);
assert_eq!(14, csv.base_config.file_schema.fields().len());
@@ -858,6 +877,7 @@ mod tests {
b'"',
None,
None,
+ false,
file_compression_type.to_owned(),
);
assert_eq!(13, csv.base_config.file_schema.fields().len());
@@ -953,6 +973,7 @@ mod tests {
b'"',
None,
None,
+ false,
file_compression_type.to_owned(),
);
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index afed5dd375..9791f23f96 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1472,6 +1472,7 @@ pub(crate) mod tests {
b'"',
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -1496,6 +1497,7 @@ pub(crate) mod tests {
b'"',
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -3770,6 +3772,7 @@ pub(crate) mod tests {
b'"',
None,
None,
+ false,
compression_type,
)),
vec![("a".to_string(), "a".to_string())],
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 84f8984317..d0d0c985b8 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -186,6 +186,7 @@ fn try_swapping_with_csv(
csv.quote(),
csv.escape(),
csv.comment(),
+ csv.newlines_in_values(),
csv.file_compression_type,
)) as _
})
@@ -1700,6 +1701,7 @@ mod tests {
0,
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -1723,6 +1725,7 @@ mod tests {
0,
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 013155b840..6565e3e7d0 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -1503,6 +1503,7 @@ mod tests {
b'"',
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index e8550a79cb..5cb1b6ea70 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -99,6 +99,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir:
&Path) -> Result<Arc<Cs
b'"',
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
)))
}
@@ -283,6 +284,7 @@ pub fn csv_exec_sorted(
0,
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
@@ -339,6 +341,7 @@ pub fn csv_exec_ordered(
b'"',
None,
None,
+ false,
FileCompressionType::UNCOMPRESSED,
))
}
diff --git a/datafusion/core/tests/data/newlines_in_values.csv
b/datafusion/core/tests/data/newlines_in_values.csv
new file mode 100644
index 0000000000..de0cdb94a5
--- /dev/null
+++ b/datafusion/core/tests/data/newlines_in_values.csv
@@ -0,0 +1,13 @@
+id,message
+1,"hello
+world"
+2,"something
+else"
+3,"
+many
+lines
+make
+good test
+"
+4,unquoted
+value,end
diff --git a/datafusion/proto-common/proto/datafusion_common.proto
b/datafusion/proto-common/proto/datafusion_common.proto
index e2a405595f..ca95136dad 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -410,6 +410,7 @@ message CsvOptions {
string null_value = 12; // Optional representation of null value
bytes comment = 13; // Optional comment character as a byte
bytes double_quote = 14; // Indicates if quotes are doubled
+ bytes newlines_in_values = 15; // Indicates if newlines are supported in
values
}
// Options controlling CSV format
diff --git a/datafusion/proto-common/src/from_proto/mod.rs
b/datafusion/proto-common/src/from_proto/mod.rs
index 52ca5781dc..9191ff185a 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -860,6 +860,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions {
quote: proto_opts.quote[0],
escape: proto_opts.escape.first().copied(),
double_quote: proto_opts.has_header.first().map(|h| *h != 0),
+ newlines_in_values: proto_opts.newlines_in_values.first().map(|h|
*h != 0),
compression: proto_opts.compression().into(),
schema_infer_max_rec: proto_opts.schema_infer_max_rec as usize,
date_format: (!proto_opts.date_format.is_empty())
diff --git a/datafusion/proto-common/src/generated/pbjson.rs
b/datafusion/proto-common/src/generated/pbjson.rs
index be3cc58b23..4b34660ae2 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -1884,6 +1884,9 @@ impl serde::Serialize for CsvOptions {
if !self.double_quote.is_empty() {
len += 1;
}
+ if !self.newlines_in_values.is_empty() {
+ len += 1;
+ }
let mut struct_ser =
serializer.serialize_struct("datafusion_common.CsvOptions", len)?;
if !self.has_header.is_empty() {
#[allow(clippy::needless_borrow)]
@@ -1936,6 +1939,10 @@ impl serde::Serialize for CsvOptions {
#[allow(clippy::needless_borrow)]
struct_ser.serialize_field("doubleQuote",
pbjson::private::base64::encode(&self.double_quote).as_str())?;
}
+ if !self.newlines_in_values.is_empty() {
+ #[allow(clippy::needless_borrow)]
+ struct_ser.serialize_field("newlinesInValues",
pbjson::private::base64::encode(&self.newlines_in_values).as_str())?;
+ }
struct_ser.end()
}
}
@@ -1969,6 +1976,8 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"comment",
"double_quote",
"doubleQuote",
+ "newlines_in_values",
+ "newlinesInValues",
];
#[allow(clippy::enum_variant_names)]
@@ -1987,6 +1996,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
NullValue,
Comment,
DoubleQuote,
+ NewlinesInValues,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -2022,6 +2032,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
"nullValue" | "null_value" =>
Ok(GeneratedField::NullValue),
"comment" => Ok(GeneratedField::Comment),
"doubleQuote" | "double_quote" =>
Ok(GeneratedField::DoubleQuote),
+ "newlinesInValues" | "newlines_in_values" =>
Ok(GeneratedField::NewlinesInValues),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -2055,6 +2066,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
let mut null_value__ = None;
let mut comment__ = None;
let mut double_quote__ = None;
+ let mut newlines_in_values__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::HasHeader => {
@@ -2155,6 +2167,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
;
}
+ GeneratedField::NewlinesInValues => {
+ if newlines_in_values__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("newlinesInValues"));
+ }
+ newlines_in_values__ =
+
Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
+ ;
+ }
}
}
Ok(CsvOptions {
@@ -2172,6 +2192,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions {
null_value: null_value__.unwrap_or_default(),
comment: comment__.unwrap_or_default(),
double_quote: double_quote__.unwrap_or_default(),
+ newlines_in_values:
newlines_in_values__.unwrap_or_default(),
})
}
}
diff --git a/datafusion/proto-common/src/generated/prost.rs
b/datafusion/proto-common/src/generated/prost.rs
index b0674ff28d..9a2770997f 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -633,6 +633,9 @@ pub struct CsvOptions {
/// Indicates if quotes are doubled
#[prost(bytes = "vec", tag = "14")]
pub double_quote: ::prost::alloc::vec::Vec<u8>,
+ /// Indicates if newlines are supported in values
+ #[prost(bytes = "vec", tag = "15")]
+ pub newlines_in_values: ::prost::alloc::vec::Vec<u8>,
}
/// Options controlling CSV format
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto-common/src/to_proto/mod.rs
b/datafusion/proto-common/src/to_proto/mod.rs
index 705a479e01..9dcb65444a 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -900,6 +900,9 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions {
quote: vec![opts.quote],
escape: opts.escape.map_or_else(Vec::new, |e| vec![e]),
double_quote: opts.double_quote.map_or_else(Vec::new, |h| vec![h
as u8]),
+ newlines_in_values: opts
+ .newlines_in_values
+ .map_or_else(Vec::new, |h| vec![h as u8]),
compression: compression.into(),
schema_infer_max_rec: opts.schema_infer_max_rec as u64,
date_format: opts.date_format.clone().unwrap_or_default(),
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index dc551778c5..49d9f2dde6 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1007,6 +1007,7 @@ message CsvScanExecNode {
oneof optional_comment {
string comment = 6;
}
+ bool newlines_in_values = 7;
}
message AvroScanExecNode {
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index b0674ff28d..9a2770997f 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -633,6 +633,9 @@ pub struct CsvOptions {
/// Indicates if quotes are doubled
#[prost(bytes = "vec", tag = "14")]
pub double_quote: ::prost::alloc::vec::Vec<u8>,
+ /// Indicates if newlines are supported in values
+ #[prost(bytes = "vec", tag = "15")]
+ pub newlines_in_values: ::prost::alloc::vec::Vec<u8>,
}
/// Options controlling CSV format
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 8f77c24bd9..25f6646d2a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -3605,6 +3605,9 @@ impl serde::Serialize for CsvScanExecNode {
if !self.quote.is_empty() {
len += 1;
}
+ if self.newlines_in_values {
+ len += 1;
+ }
if self.optional_escape.is_some() {
len += 1;
}
@@ -3624,6 +3627,9 @@ impl serde::Serialize for CsvScanExecNode {
if !self.quote.is_empty() {
struct_ser.serialize_field("quote", &self.quote)?;
}
+ if self.newlines_in_values {
+ struct_ser.serialize_field("newlinesInValues",
&self.newlines_in_values)?;
+ }
if let Some(v) = self.optional_escape.as_ref() {
match v {
csv_scan_exec_node::OptionalEscape::Escape(v) => {
@@ -3654,6 +3660,8 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
"hasHeader",
"delimiter",
"quote",
+ "newlines_in_values",
+ "newlinesInValues",
"escape",
"comment",
];
@@ -3664,6 +3672,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
HasHeader,
Delimiter,
Quote,
+ NewlinesInValues,
Escape,
Comment,
}
@@ -3691,6 +3700,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
"hasHeader" | "has_header" =>
Ok(GeneratedField::HasHeader),
"delimiter" => Ok(GeneratedField::Delimiter),
"quote" => Ok(GeneratedField::Quote),
+ "newlinesInValues" | "newlines_in_values" =>
Ok(GeneratedField::NewlinesInValues),
"escape" => Ok(GeneratedField::Escape),
"comment" => Ok(GeneratedField::Comment),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
@@ -3716,6 +3726,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
let mut has_header__ = None;
let mut delimiter__ = None;
let mut quote__ = None;
+ let mut newlines_in_values__ = None;
let mut optional_escape__ = None;
let mut optional_comment__ = None;
while let Some(k) = map_.next_key()? {
@@ -3744,6 +3755,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
}
quote__ = Some(map_.next_value()?);
}
+ GeneratedField::NewlinesInValues => {
+ if newlines_in_values__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("newlinesInValues"));
+ }
+ newlines_in_values__ = Some(map_.next_value()?);
+ }
GeneratedField::Escape => {
if optional_escape__.is_some() {
return
Err(serde::de::Error::duplicate_field("escape"));
@@ -3763,6 +3780,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode {
has_header: has_header__.unwrap_or_default(),
delimiter: delimiter__.unwrap_or_default(),
quote: quote__.unwrap_or_default(),
+ newlines_in_values:
newlines_in_values__.unwrap_or_default(),
optional_escape: optional_escape__,
optional_comment: optional_comment__,
})
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 605c56fa94..ba288fe3d1 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1542,6 +1542,8 @@ pub struct CsvScanExecNode {
pub delimiter: ::prost::alloc::string::String,
#[prost(string, tag = "4")]
pub quote: ::prost::alloc::string::String,
+ #[prost(bool, tag = "7")]
+ pub newlines_in_values: bool,
#[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")]
pub optional_escape:
::core::option::Option<csv_scan_exec_node::OptionalEscape>,
#[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")]
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 1220f42ded..9e17c19ecb 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -211,6 +211,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
} else {
None
},
+ scan.newlines_in_values,
FileCompressionType::UNCOMPRESSED,
))),
#[cfg(feature = "parquet")]
@@ -1579,6 +1580,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
} else {
None
},
+ newlines_in_values: exec.newlines_in_values(),
},
)),
});
diff --git a/datafusion/sqllogictest/test_files/csv_files.slt
b/datafusion/sqllogictest/test_files/csv_files.slt
index ca3bebe79f..f7f5aa54dd 100644
--- a/datafusion/sqllogictest/test_files/csv_files.slt
+++ b/datafusion/sqllogictest/test_files/csv_files.slt
@@ -293,3 +293,45 @@ id0 "value0"
id1 "value1"
id2 "value2"
id3 "value3"
+
+# Handling of newlines in values
+
+statement ok
+SET datafusion.optimizer.repartition_file_min_size = 1;
+
+statement ok
+CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_unsafe (
+col1 TEXT,
+col2 TEXT
+) STORED AS CSV
+LOCATION '../core/tests/data/newlines_in_values.csv';
+
+statement error incorrect number of fields
+select * from stored_table_with_newlines_in_values_unsafe;
+
+statement ok
+CREATE EXTERNAL TABLE stored_table_with_newlines_in_values_safe (
+col1 TEXT,
+col2 TEXT
+) STORED AS CSV
+LOCATION '../core/tests/data/newlines_in_values.csv'
+OPTIONS ('format.newlines_in_values' 'true');
+
+query TT
+select * from stored_table_with_newlines_in_values_safe;
+----
+id message
+1
+01)hello
+02)world
+2
+01)something
+02)else
+3
+01)
+02)many
+03)lines
+04)make
+05)good test
+4 unquoted
+value end
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt
b/datafusion/sqllogictest/test_files/information_schema.slt
index f7b755b019..c8c0d1d45b 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -168,6 +168,7 @@ datafusion.catalog.format NULL
datafusion.catalog.has_header false
datafusion.catalog.information_schema true
datafusion.catalog.location NULL
+datafusion.catalog.newlines_in_values false
datafusion.execution.aggregate.scalar_update_factor 10
datafusion.execution.batch_size 8192
datafusion.execution.coalesce_batches true
@@ -252,6 +253,7 @@ datafusion.catalog.format NULL Type of `TableProvider` to
use when loading `defa
datafusion.catalog.has_header false Default value for `format.has_header` for
`CREATE EXTERNAL TABLE` if not specified explicitly in the statement.
datafusion.catalog.information_schema true Should DataFusion provide access to
`information_schema` virtual tables for displaying schema information
datafusion.catalog.location NULL Location scanned to load tables for `default`
schema
+datafusion.catalog.newlines_in_values false Specifies whether newlines in
(quoted) CSV values are supported. This is the default value for
`format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified
explicitly in the statement. Parsing newlines in quoted values may be affected
by execution behaviour such as parallel file scanning. Setting this to `true`
ensures that newlines in values are parsed successfully, which may reduce
performance.
datafusion.execution.aggregate.scalar_update_factor 10 Specifies the threshold
for using `ScalarValue`s to update accumulators during high-cardinality
aggregations for each input batch. The aggregation is considered
high-cardinality if the number of affected groups is greater than or equal to
`batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized
for updating accumulators, rather than the default batch-slice approach. This
can lead to performance improvements. By [...]
datafusion.execution.batch_size 8192 Default batch size while creating new
batches, it's especially useful for buffer-in-memory batches since creating
tiny batches would result in too much metadata memory consumption
datafusion.execution.coalesce_batches true When set to true, record batches
will be examined between each operator and small batches will be coalesced into
larger batches. This is helpful when there are highly selective filters or
joins that could produce tiny output batches. The target batch size is
determined by the configuration setting
diff --git a/docs/source/user-guide/configs.md
b/docs/source/user-guide/configs.md
index 8d3ecbc985..5e5de016e3 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -44,6 +44,7 @@ Environment variables are read during `SessionConfig`
initialisation so they mus
| datafusion.catalog.location |
NULL | Location scanned to load tables for `default`
schema
[...]
| datafusion.catalog.format |
NULL | Type of `TableProvider` to use when loading
`default` schema
[...]
| datafusion.catalog.has_header |
false | Default value for `format.has_header` for `CREATE
EXTERNAL TABLE` if not specified explicitly in the statement.
[...]
+| datafusion.catalog.newlines_in_values |
false | Specifies whether newlines in (quoted) CSV values
are supported. This is the default value for `format.newlines_in_values` for
`CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing
newlines in quoted values may be affected by execution behaviour such as
parallel file scanning. Setting this to `true` ensures that newlines in values
are parsed successfully, which [...]
| datafusion.execution.batch_size |
8192 | Default batch size while creating new batches, it's
especially useful for buffer-in-memory batches since creating tiny batches
would result in too much metadata memory consumption
[...]
| datafusion.execution.coalesce_batches |
true | When set to true, record batches will be examined
between each operator and small batches will be coalesced into larger batches.
This is helpful when there are highly selective filters or joins that could
produce tiny output batches. The target batch size is determined by the
configuration setting
[...]
| datafusion.execution.collect_statistics |
false | Should DataFusion collect statistics after listing
files
[...]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]