This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0a0416dbfe Move `newlines_in_values` from `FileScanConfig` to
`CsvSource` (#19313)
0a0416dbfe is described below
commit 0a0416dbfe3c8fef7f39f370b755e8f46b0d9d73
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Dec 18 10:23:53 2025 -0600
Move `newlines_in_values` from `FileScanConfig` to `CsvSource` (#19313)
## Summary
This PR moves the CSV-specific `newlines_in_values` configuration option
from `FileScanConfig` (a shared format-agnostic configuration) to
`CsvSource` where it belongs.
- Add `newlines_in_values` field and methods to `CsvSource`
- Add `has_newlines_in_values()` method to `FileSource` trait (returns
`false` by default)
- Update `FileSource::repartitioned()` to use the new trait method
- Remove `new_lines_in_values` from `FileScanConfig` and its builder
- Update proto serialization to read from/write to `CsvSource`
- Update tests and documentation
- Add migration guide to `upgrading.md`
Closes #18453
## Test plan
- [x] All existing tests pass
- [x] Doc tests pass
- [x] Proto roundtrip tests pass
- [x] Clippy clean
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
.../core/src/datasource/physical_plan/csv.rs | 6 ---
datafusion/datasource-arrow/src/source.rs | 4 +-
datafusion/datasource-csv/src/file_format.rs | 2 +-
datafusion/datasource-csv/src/source.rs | 14 +++++-
datafusion/datasource/src/file.rs | 18 +++++++-
datafusion/datasource/src/file_scan_config.rs | 50 +++++++---------------
datafusion/proto/src/physical_plan/mod.rs | 4 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 1 -
docs/source/library-user-guide/upgrading.md | 34 +++++++++++++++
9 files changed, 84 insertions(+), 49 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 892ae5b586..8193308b9c 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -128,7 +128,6 @@ mod tests {
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups,
source)?)
.with_file_compression_type(file_compression_type)
- .with_newlines_in_values(false)
.with_projection_indices(Some(vec![0, 2, 4]))?
.build();
@@ -200,7 +199,6 @@ mod tests {
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups,
source)?)
- .with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.with_projection_indices(Some(vec![4, 0, 2]))?
.build();
@@ -272,7 +270,6 @@ mod tests {
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups,
source)?)
- .with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5))
.build();
@@ -343,7 +340,6 @@ mod tests {
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups,
source)?)
- .with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.with_limit(Some(5))
.build();
@@ -412,7 +408,6 @@ mod tests {
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups,
source)?)
- .with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
// We should be able to project on the partition column
// Which is supposed to be after the file fields
@@ -518,7 +513,6 @@ mod tests {
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
let config =
FileScanConfigBuilder::from(partitioned_csv_config(file_groups,
source)?)
- .with_newlines_in_values(false)
.with_file_compression_type(file_compression_type.to_owned())
.build();
let csv = DataSourceExec::from_data_source(config);
diff --git a/datafusion/datasource-arrow/src/source.rs
b/datafusion/datasource-arrow/src/source.rs
index 892ab01b23..ed0238d788 100644
--- a/datafusion/datasource-arrow/src/source.rs
+++ b/datafusion/datasource-arrow/src/source.rs
@@ -460,9 +460,7 @@ impl FileSource for ArrowSource {
// Use the default trait implementation logic for file format
use datafusion_datasource::file_groups::FileGroupPartitioner;
- if config.file_compression_type.is_compressed()
- || config.new_lines_in_values
- {
+ if config.file_compression_type.is_compressed() {
return Ok(None);
}
diff --git a/datafusion/datasource-csv/src/file_format.rs
b/datafusion/datasource-csv/src/file_format.rs
index 7729f13901..1bb8679102 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -437,6 +437,7 @@ impl FileFormat for CsvFormat {
let mut csv_options = self.options.clone();
csv_options.has_header = Some(has_header);
+ csv_options.newlines_in_values = Some(newlines_in_values);
// Get the existing CsvSource and update its options
// We need to preserve the table_schema from the original source
(which includes partition columns)
@@ -449,7 +450,6 @@ impl FileFormat for CsvFormat {
let config = FileScanConfigBuilder::from(conf)
.with_file_compression_type(self.options.compression.into())
- .with_newlines_in_values(newlines_in_values)
.with_source(source)
.build();
diff --git a/datafusion/datasource-csv/src/source.rs
b/datafusion/datasource-csv/src/source.rs
index b318d89189..2aed31f1bb 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -72,6 +72,7 @@ use tokio::io::AsyncWriteExt;
/// has_header: Some(true),
/// delimiter: b',',
/// quote: b'"',
+/// newlines_in_values: Some(true), // The file contains newlines in values
/// ..Default::default()
/// };
/// let source = Arc::new(CsvSource::new(file_schema.clone())
@@ -81,7 +82,6 @@ use tokio::io::AsyncWriteExt;
/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
/// let config = FileScanConfigBuilder::new(object_store_url, source)
/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
-/// .with_newlines_in_values(true) // The file contains newlines in values;
/// .build();
/// let exec = (DataSourceExec::from_data_source(config));
/// ```
@@ -176,6 +176,11 @@ impl CsvSource {
conf.options.truncated_rows = Some(truncate_rows);
conf
}
+
+ /// Whether values may contain newline characters
+ pub fn newlines_in_values(&self) -> bool {
+ self.options.newlines_in_values.unwrap_or(false)
+ }
}
impl CsvSource {
@@ -297,6 +302,13 @@ impl FileSource for CsvSource {
fn file_type(&self) -> &str {
"csv"
}
+
+ fn supports_repartitioning(&self) -> bool {
+ // Cannot repartition if values may contain newlines, as record
+ // boundaries cannot be determined by byte offset alone
+ !self.options.newlines_in_values.unwrap_or(false)
+ }
+
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
diff --git a/datafusion/datasource/src/file.rs
b/datafusion/datasource/src/file.rs
index 2c69987f91..ce17423455 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -86,6 +86,21 @@ pub trait FileSource: Send + Sync {
Ok(())
}
+ /// Returns whether this file source supports repartitioning files by byte
ranges.
+ ///
+ /// When this returns `true`, files can be split into multiple partitions
+ /// based on byte offsets for parallel reading.
+ ///
+ /// When this returns `false`, files cannot be repartitioned (e.g., CSV
files
+ /// with `newlines_in_values` enabled cannot be split because record
boundaries
+ /// cannot be determined by byte offset alone).
+ ///
+ /// The default implementation returns `true`. File sources that cannot
support
+ /// repartitioning should override this method.
+ fn supports_repartitioning(&self) -> bool {
+ true
+ }
+
/// If supported by the [`FileSource`], redistribute files across
partitions
/// according to their size. Allows custom file formats to implement their
/// own repartitioning logic.
@@ -99,7 +114,8 @@ pub trait FileSource: Send + Sync {
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
- if config.file_compression_type.is_compressed() ||
config.new_lines_in_values {
+ if config.file_compression_type.is_compressed() ||
!self.supports_repartitioning()
+ {
return Ok(None);
}
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index a9a00c227c..d4f81df793 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -160,8 +160,6 @@ pub struct FileScanConfig {
pub output_ordering: Vec<LexOrdering>,
/// File compression type
pub file_compression_type: FileCompressionType,
- /// Are new lines in values supported for CSVOptions
- pub new_lines_in_values: bool,
/// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
pub file_source: Arc<dyn FileSource>,
/// Batch size while creating new batches
@@ -251,7 +249,6 @@ pub struct FileScanConfigBuilder {
statistics: Option<Statistics>,
output_ordering: Vec<LexOrdering>,
file_compression_type: Option<FileCompressionType>,
- new_lines_in_values: Option<bool>,
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
partitioned_by_file_group: bool,
@@ -275,7 +272,6 @@ impl FileScanConfigBuilder {
statistics: None,
output_ordering: vec![],
file_compression_type: None,
- new_lines_in_values: None,
limit: None,
constraints: None,
batch_size: None,
@@ -414,16 +410,6 @@ impl FileScanConfigBuilder {
self
}
- /// Set whether new lines in values are supported for CSVOptions
- ///
- /// Parsing newlines in quoted values may be affected by execution
behaviour such as
- /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
- /// parsed successfully, which may reduce performance.
- pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) ->
Self {
- self.new_lines_in_values = Some(new_lines_in_values);
- self
- }
-
/// Set the batch_size property
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
@@ -473,7 +459,6 @@ impl FileScanConfigBuilder {
statistics,
output_ordering,
file_compression_type,
- new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
partitioned_by_file_group,
@@ -485,7 +470,6 @@ impl FileScanConfigBuilder {
});
let file_compression_type =
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
- let new_lines_in_values = new_lines_in_values.unwrap_or(false);
FileScanConfig {
object_store_url,
@@ -495,7 +479,6 @@ impl FileScanConfigBuilder {
file_groups,
output_ordering,
file_compression_type,
- new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
statistics,
@@ -513,7 +496,6 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
statistics: Some(config.statistics),
output_ordering: config.output_ordering,
file_compression_type: Some(config.file_compression_type),
- new_lines_in_values: Some(config.new_lines_in_values),
limit: config.limit,
constraints: Some(config.constraints),
batch_size: config.batch_size,
@@ -945,6 +927,22 @@ impl FileScanConfig {
Ok(())
}
+ /// Returns whether newlines in values are supported.
+ ///
+ /// This method always returns `false`. The actual newlines_in_values
setting
+ /// has been moved to [`CsvSource`] and should be accessed via
+ /// [`CsvSource::csv_options()`] instead.
+ ///
+ /// [`CsvSource`]:
https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
+ /// [`CsvSource::csv_options()`]:
https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
+ #[deprecated(
+ since = "52.0.0",
+ note = "newlines_in_values has moved to CsvSource. Access it via
CsvSource::csv_options().newlines_in_values instead. It will be removed in
58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
+ )]
+ pub fn newlines_in_values(&self) -> bool {
+ false
+ }
+
#[deprecated(
since = "52.0.0",
note = "This method is no longer used, use eq_properties instead. It
will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes
first."
@@ -954,17 +952,6 @@ impl FileScanConfig {
props.constraints().clone()
}
- /// Specifies whether newlines in (quoted) values are supported.
- ///
- /// Parsing newlines in quoted values may be affected by execution
behaviour such as
- /// parallel file scanning. Setting this to `true` ensures that newlines
in values are
- /// parsed successfully, which may reduce performance.
- ///
- /// The default behaviour depends on the
`datafusion.catalog.newlines_in_values` setting.
- pub fn newlines_in_values(&self) -> bool {
- self.new_lines_in_values
- }
-
#[deprecated(
since = "52.0.0",
note = "This method is no longer used, use eq_properties instead. It
will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes
first."
@@ -1793,7 +1780,6 @@ mod tests {
.into(),
])
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
- .with_newlines_in_values(true)
.build();
// Verify the built config has all the expected values
@@ -1820,7 +1806,6 @@ mod tests {
config.file_compression_type,
FileCompressionType::UNCOMPRESSED
);
- assert!(config.new_lines_in_values);
assert_eq!(config.output_ordering.len(), 1);
}
@@ -1915,7 +1900,6 @@ mod tests {
config.file_compression_type,
FileCompressionType::UNCOMPRESSED
);
- assert!(!config.new_lines_in_values);
assert!(config.output_ordering.is_empty());
assert!(config.constraints.is_empty());
@@ -1963,7 +1947,6 @@ mod tests {
.with_limit(Some(10))
.with_file(file.clone())
.with_constraints(Constraints::default())
- .with_newlines_in_values(true)
.build();
// Create a new builder from the config
@@ -1993,7 +1976,6 @@ mod tests {
"test_file.parquet"
);
assert_eq!(new_config.constraints, Constraints::default());
- assert!(new_config.new_lines_in_values);
}
#[test]
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index 5c64dfbce3..4ff90b61ee 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -631,6 +631,7 @@ impl protobuf::PhysicalPlanNode {
has_header: Some(scan.has_header),
delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
quote: str_to_byte(&scan.quote, "quote")?,
+ newlines_in_values: Some(scan.newlines_in_values),
..Default::default()
};
let source = Arc::new(
@@ -646,7 +647,6 @@ impl protobuf::PhysicalPlanNode {
extension_codec,
source,
)?)
- .with_newlines_in_values(scan.newlines_in_values)
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
.build();
Ok(DataSourceExec::from_data_source(conf))
@@ -2631,7 +2631,7 @@ impl protobuf::PhysicalPlanNode {
} else {
None
},
- newlines_in_values: maybe_csv.newlines_in_values(),
+ newlines_in_values:
csv_config.newlines_in_values(),
truncate_rows: csv_config.truncate_rows(),
},
)),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index fa505e6f15..f9babeba56 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -932,7 +932,6 @@ async fn roundtrip_parquet_exec_with_table_partition_cols()
-> Result<()> {
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(),
file_source)
.with_projection_indices(Some(vec![0, 1]))?
.with_file_group(FileGroup::new(vec![file_group]))
- .with_newlines_in_values(false)
.build();
roundtrip_test(DataSourceExec::from_data_source(scan_config))
diff --git a/docs/source/library-user-guide/upgrading.md
b/docs/source/library-user-guide/upgrading.md
index 159bd3e4e7..bd9fb3f55a 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -57,6 +57,40 @@ See <https://github.com/apache/datafusion/issues/19056> for
more details.
Note that the internal API has changed to use a trait `ListFilesCache` instead
of a type alias.
+### `newlines_in_values` moved from `FileScanConfig` to `CsvOptions`
+
+The CSV-specific `newlines_in_values` configuration option has been moved from
`FileScanConfig` to `CsvOptions`, as it only applies to CSV file parsing.
+
+**Who is affected:**
+
+- Users who set `newlines_in_values` via
`FileScanConfigBuilder::with_newlines_in_values()`
+
+**Migration guide:**
+
+Set `newlines_in_values` in `CsvOptions` instead of on `FileScanConfigBuilder`:
+
+**Before:**
+
+```rust,ignore
+let source = Arc::new(CsvSource::new(file_schema.clone()));
+let config = FileScanConfigBuilder::new(object_store_url, source)
+ .with_newlines_in_values(true)
+ .build();
+```
+
+**After:**
+
+```rust,ignore
+let options = CsvOptions {
+ newlines_in_values: Some(true),
+ ..Default::default()
+};
+let source = Arc::new(CsvSource::new(file_schema.clone())
+ .with_csv_options(options));
+let config = FileScanConfigBuilder::new(object_store_url, source)
+ .build();
+```
+
### Removal of `pyarrow` feature
The `pyarrow` feature flag has been removed. This feature has been migrated to
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]