This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a0416dbfe Move `newlines_in_values` from `FileScanConfig` to 
`CsvSource` (#19313)
0a0416dbfe is described below

commit 0a0416dbfe3c8fef7f39f370b755e8f46b0d9d73
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Dec 18 10:23:53 2025 -0600

    Move `newlines_in_values` from `FileScanConfig` to `CsvSource` (#19313)
    
    ## Summary
    
    This PR moves the CSV-specific `newlines_in_values` configuration option
    from `FileScanConfig` (a shared format-agnostic configuration) to
    `CsvSource` where it belongs.
    
    - Add `newlines_in_values` field and methods to `CsvSource`
    - Add `has_newlines_in_values()` method to `FileSource` trait (returns
    `false` by default)
    - Update `FileSource::repartitioned()` to use the new trait method
    - Remove `new_lines_in_values` from `FileScanConfig` and its builder
    - Update proto serialization to read from/write to `CsvSource`
    - Update tests and documentation
    - Add migration guide to `upgrading.md`
    
    Closes #18453
    
    ## Test plan
    
    - [x] All existing tests pass
    - [x] Doc tests pass
    - [x] Proto roundtrip tests pass
    - [x] Clippy clean
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.5 <[email protected]>
---
 .../core/src/datasource/physical_plan/csv.rs       |  6 ---
 datafusion/datasource-arrow/src/source.rs          |  4 +-
 datafusion/datasource-csv/src/file_format.rs       |  2 +-
 datafusion/datasource-csv/src/source.rs            | 14 +++++-
 datafusion/datasource/src/file.rs                  | 18 +++++++-
 datafusion/datasource/src/file_scan_config.rs      | 50 +++++++---------------
 datafusion/proto/src/physical_plan/mod.rs          |  4 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  1 -
 docs/source/library-user-guide/upgrading.md        | 34 +++++++++++++++
 9 files changed, 84 insertions(+), 49 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs 
b/datafusion/core/src/datasource/physical_plan/csv.rs
index 892ae5b586..8193308b9c 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -128,7 +128,6 @@ mod tests {
         let config =
             FileScanConfigBuilder::from(partitioned_csv_config(file_groups, 
source)?)
                 .with_file_compression_type(file_compression_type)
-                .with_newlines_in_values(false)
                 .with_projection_indices(Some(vec![0, 2, 4]))?
                 .build();
 
@@ -200,7 +199,6 @@ mod tests {
             
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
         let config =
             FileScanConfigBuilder::from(partitioned_csv_config(file_groups, 
source)?)
-                .with_newlines_in_values(false)
                 .with_file_compression_type(file_compression_type.to_owned())
                 .with_projection_indices(Some(vec![4, 0, 2]))?
                 .build();
@@ -272,7 +270,6 @@ mod tests {
             
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
         let config =
             FileScanConfigBuilder::from(partitioned_csv_config(file_groups, 
source)?)
-                .with_newlines_in_values(false)
                 .with_file_compression_type(file_compression_type.to_owned())
                 .with_limit(Some(5))
                 .build();
@@ -343,7 +340,6 @@ mod tests {
             
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
         let config =
             FileScanConfigBuilder::from(partitioned_csv_config(file_groups, 
source)?)
-                .with_newlines_in_values(false)
                 .with_file_compression_type(file_compression_type.to_owned())
                 .with_limit(Some(5))
                 .build();
@@ -412,7 +408,6 @@ mod tests {
             
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
         let config =
             FileScanConfigBuilder::from(partitioned_csv_config(file_groups, 
source)?)
-                .with_newlines_in_values(false)
                 .with_file_compression_type(file_compression_type.to_owned())
                 // We should be able to project on the partition column
                 // Which is supposed to be after the file fields
@@ -518,7 +513,6 @@ mod tests {
             
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
         let config =
             FileScanConfigBuilder::from(partitioned_csv_config(file_groups, 
source)?)
-                .with_newlines_in_values(false)
                 .with_file_compression_type(file_compression_type.to_owned())
                 .build();
         let csv = DataSourceExec::from_data_source(config);
diff --git a/datafusion/datasource-arrow/src/source.rs 
b/datafusion/datasource-arrow/src/source.rs
index 892ab01b23..ed0238d788 100644
--- a/datafusion/datasource-arrow/src/source.rs
+++ b/datafusion/datasource-arrow/src/source.rs
@@ -460,9 +460,7 @@ impl FileSource for ArrowSource {
                 // Use the default trait implementation logic for file format
                 use datafusion_datasource::file_groups::FileGroupPartitioner;
 
-                if config.file_compression_type.is_compressed()
-                    || config.new_lines_in_values
-                {
+                if config.file_compression_type.is_compressed() {
                     return Ok(None);
                 }
 
diff --git a/datafusion/datasource-csv/src/file_format.rs 
b/datafusion/datasource-csv/src/file_format.rs
index 7729f13901..1bb8679102 100644
--- a/datafusion/datasource-csv/src/file_format.rs
+++ b/datafusion/datasource-csv/src/file_format.rs
@@ -437,6 +437,7 @@ impl FileFormat for CsvFormat {
 
         let mut csv_options = self.options.clone();
         csv_options.has_header = Some(has_header);
+        csv_options.newlines_in_values = Some(newlines_in_values);
 
         // Get the existing CsvSource and update its options
         // We need to preserve the table_schema from the original source 
(which includes partition columns)
@@ -449,7 +450,6 @@ impl FileFormat for CsvFormat {
 
         let config = FileScanConfigBuilder::from(conf)
             .with_file_compression_type(self.options.compression.into())
-            .with_newlines_in_values(newlines_in_values)
             .with_source(source)
             .build();
 
diff --git a/datafusion/datasource-csv/src/source.rs 
b/datafusion/datasource-csv/src/source.rs
index b318d89189..2aed31f1bb 100644
--- a/datafusion/datasource-csv/src/source.rs
+++ b/datafusion/datasource-csv/src/source.rs
@@ -72,6 +72,7 @@ use tokio::io::AsyncWriteExt;
 ///     has_header: Some(true),
 ///     delimiter: b',',
 ///     quote: b'"',
+///     newlines_in_values: Some(true), // The file contains newlines in values
 ///     ..Default::default()
 /// };
 /// let source = Arc::new(CsvSource::new(file_schema.clone())
@@ -81,7 +82,6 @@ use tokio::io::AsyncWriteExt;
 /// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
 /// let config = FileScanConfigBuilder::new(object_store_url, source)
 ///     .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
-///     .with_newlines_in_values(true) // The file contains newlines in values;
 ///     .build();
 /// let exec = (DataSourceExec::from_data_source(config));
 /// ```
@@ -176,6 +176,11 @@ impl CsvSource {
         conf.options.truncated_rows = Some(truncate_rows);
         conf
     }
+
+    /// Whether values may contain newline characters
+    pub fn newlines_in_values(&self) -> bool {
+        self.options.newlines_in_values.unwrap_or(false)
+    }
 }
 
 impl CsvSource {
@@ -297,6 +302,13 @@ impl FileSource for CsvSource {
     fn file_type(&self) -> &str {
         "csv"
     }
+
+    fn supports_repartitioning(&self) -> bool {
+        // Cannot repartition if values may contain newlines, as record
+        // boundaries cannot be determined by byte offset alone
+        !self.options.newlines_in_values.unwrap_or(false)
+    }
+
     fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
diff --git a/datafusion/datasource/src/file.rs 
b/datafusion/datasource/src/file.rs
index 2c69987f91..ce17423455 100644
--- a/datafusion/datasource/src/file.rs
+++ b/datafusion/datasource/src/file.rs
@@ -86,6 +86,21 @@ pub trait FileSource: Send + Sync {
         Ok(())
     }
 
+    /// Returns whether this file source supports repartitioning files by byte 
ranges.
+    ///
+    /// When this returns `true`, files can be split into multiple partitions
+    /// based on byte offsets for parallel reading.
+    ///
+    /// When this returns `false`, files cannot be repartitioned (e.g., CSV 
files
+    /// with `newlines_in_values` enabled cannot be split because record 
boundaries
+    /// cannot be determined by byte offset alone).
+    ///
+    /// The default implementation returns `true`. File sources that cannot 
support
+    /// repartitioning should override this method.
+    fn supports_repartitioning(&self) -> bool {
+        true
+    }
+
     /// If supported by the [`FileSource`], redistribute files across 
partitions
     /// according to their size. Allows custom file formats to implement their
     /// own repartitioning logic.
@@ -99,7 +114,8 @@ pub trait FileSource: Send + Sync {
         output_ordering: Option<LexOrdering>,
         config: &FileScanConfig,
     ) -> Result<Option<FileScanConfig>> {
-        if config.file_compression_type.is_compressed() || 
config.new_lines_in_values {
+        if config.file_compression_type.is_compressed() || 
!self.supports_repartitioning()
+        {
             return Ok(None);
         }
 
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index a9a00c227c..d4f81df793 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -160,8 +160,6 @@ pub struct FileScanConfig {
     pub output_ordering: Vec<LexOrdering>,
     /// File compression type
     pub file_compression_type: FileCompressionType,
-    /// Are new lines in values supported for CSVOptions
-    pub new_lines_in_values: bool,
     /// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
     pub file_source: Arc<dyn FileSource>,
     /// Batch size while creating new batches
@@ -251,7 +249,6 @@ pub struct FileScanConfigBuilder {
     statistics: Option<Statistics>,
     output_ordering: Vec<LexOrdering>,
     file_compression_type: Option<FileCompressionType>,
-    new_lines_in_values: Option<bool>,
     batch_size: Option<usize>,
     expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
     partitioned_by_file_group: bool,
@@ -275,7 +272,6 @@ impl FileScanConfigBuilder {
             statistics: None,
             output_ordering: vec![],
             file_compression_type: None,
-            new_lines_in_values: None,
             limit: None,
             constraints: None,
             batch_size: None,
@@ -414,16 +410,6 @@ impl FileScanConfigBuilder {
         self
     }
 
-    /// Set whether new lines in values are supported for CSVOptions
-    ///
-    /// Parsing newlines in quoted values may be affected by execution 
behaviour such as
-    /// parallel file scanning. Setting this to `true` ensures that newlines 
in values are
-    /// parsed successfully, which may reduce performance.
-    pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> 
Self {
-        self.new_lines_in_values = Some(new_lines_in_values);
-        self
-    }
-
     /// Set the batch_size property
     pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
         self.batch_size = batch_size;
@@ -473,7 +459,6 @@ impl FileScanConfigBuilder {
             statistics,
             output_ordering,
             file_compression_type,
-            new_lines_in_values,
             batch_size,
             expr_adapter_factory: expr_adapter,
             partitioned_by_file_group,
@@ -485,7 +470,6 @@ impl FileScanConfigBuilder {
         });
         let file_compression_type =
             file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
-        let new_lines_in_values = new_lines_in_values.unwrap_or(false);
 
         FileScanConfig {
             object_store_url,
@@ -495,7 +479,6 @@ impl FileScanConfigBuilder {
             file_groups,
             output_ordering,
             file_compression_type,
-            new_lines_in_values,
             batch_size,
             expr_adapter_factory: expr_adapter,
             statistics,
@@ -513,7 +496,6 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
             statistics: Some(config.statistics),
             output_ordering: config.output_ordering,
             file_compression_type: Some(config.file_compression_type),
-            new_lines_in_values: Some(config.new_lines_in_values),
             limit: config.limit,
             constraints: Some(config.constraints),
             batch_size: config.batch_size,
@@ -945,6 +927,22 @@ impl FileScanConfig {
         Ok(())
     }
 
+    /// Returns whether newlines in values are supported.
+    ///
+    /// This method always returns `false`. The actual newlines_in_values 
setting
+    /// has been moved to [`CsvSource`] and should be accessed via
+    /// [`CsvSource::csv_options()`] instead.
+    ///
+    /// [`CsvSource`]: 
https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html
+    /// [`CsvSource::csv_options()`]: 
https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html#method.csv_options
+    #[deprecated(
+        since = "52.0.0",
+        note = "newlines_in_values has moved to CsvSource. Access it via 
CsvSource::csv_options().newlines_in_values instead. It will be removed in 
58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
+    )]
+    pub fn newlines_in_values(&self) -> bool {
+        false
+    }
+
     #[deprecated(
         since = "52.0.0",
         note = "This method is no longer used, use eq_properties instead. It 
will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes 
first."
@@ -954,17 +952,6 @@ impl FileScanConfig {
         props.constraints().clone()
     }
 
-    /// Specifies whether newlines in (quoted) values are supported.
-    ///
-    /// Parsing newlines in quoted values may be affected by execution 
behaviour such as
-    /// parallel file scanning. Setting this to `true` ensures that newlines 
in values are
-    /// parsed successfully, which may reduce performance.
-    ///
-    /// The default behaviour depends on the 
`datafusion.catalog.newlines_in_values` setting.
-    pub fn newlines_in_values(&self) -> bool {
-        self.new_lines_in_values
-    }
-
     #[deprecated(
         since = "52.0.0",
         note = "This method is no longer used, use eq_properties instead. It 
will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes 
first."
@@ -1793,7 +1780,6 @@ mod tests {
                 .into(),
             ])
             .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
-            .with_newlines_in_values(true)
             .build();
 
         // Verify the built config has all the expected values
@@ -1820,7 +1806,6 @@ mod tests {
             config.file_compression_type,
             FileCompressionType::UNCOMPRESSED
         );
-        assert!(config.new_lines_in_values);
         assert_eq!(config.output_ordering.len(), 1);
     }
 
@@ -1915,7 +1900,6 @@ mod tests {
             config.file_compression_type,
             FileCompressionType::UNCOMPRESSED
         );
-        assert!(!config.new_lines_in_values);
         assert!(config.output_ordering.is_empty());
         assert!(config.constraints.is_empty());
 
@@ -1963,7 +1947,6 @@ mod tests {
         .with_limit(Some(10))
         .with_file(file.clone())
         .with_constraints(Constraints::default())
-        .with_newlines_in_values(true)
         .build();
 
         // Create a new builder from the config
@@ -1993,7 +1976,6 @@ mod tests {
             "test_file.parquet"
         );
         assert_eq!(new_config.constraints, Constraints::default());
-        assert!(new_config.new_lines_in_values);
     }
 
     #[test]
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 5c64dfbce3..4ff90b61ee 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -631,6 +631,7 @@ impl protobuf::PhysicalPlanNode {
             has_header: Some(scan.has_header),
             delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
             quote: str_to_byte(&scan.quote, "quote")?,
+            newlines_in_values: Some(scan.newlines_in_values),
             ..Default::default()
         };
         let source = Arc::new(
@@ -646,7 +647,6 @@ impl protobuf::PhysicalPlanNode {
             extension_codec,
             source,
         )?)
-        .with_newlines_in_values(scan.newlines_in_values)
         .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
         .build();
         Ok(DataSourceExec::from_data_source(conf))
@@ -2631,7 +2631,7 @@ impl protobuf::PhysicalPlanNode {
                             } else {
                                 None
                             },
-                            newlines_in_values: maybe_csv.newlines_in_values(),
+                            newlines_in_values: 
csv_config.newlines_in_values(),
                             truncate_rows: csv_config.truncate_rows(),
                         },
                     )),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index fa505e6f15..f9babeba56 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -932,7 +932,6 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() 
-> Result<()> {
         FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
file_source)
             .with_projection_indices(Some(vec![0, 1]))?
             .with_file_group(FileGroup::new(vec![file_group]))
-            .with_newlines_in_values(false)
             .build();
 
     roundtrip_test(DataSourceExec::from_data_source(scan_config))
diff --git a/docs/source/library-user-guide/upgrading.md 
b/docs/source/library-user-guide/upgrading.md
index 159bd3e4e7..bd9fb3f55a 100644
--- a/docs/source/library-user-guide/upgrading.md
+++ b/docs/source/library-user-guide/upgrading.md
@@ -57,6 +57,40 @@ See <https://github.com/apache/datafusion/issues/19056> for 
more details.
 
 Note that the internal API has changed to use a trait `ListFilesCache` instead 
of a type alias.
 
+### `newlines_in_values` moved from `FileScanConfig` to `CsvOptions`
+
+The CSV-specific `newlines_in_values` configuration option has been moved from 
`FileScanConfig` to `CsvOptions`, as it only applies to CSV file parsing.
+
+**Who is affected:**
+
+- Users who set `newlines_in_values` via 
`FileScanConfigBuilder::with_newlines_in_values()`
+
+**Migration guide:**
+
+Set `newlines_in_values` in `CsvOptions` instead of on `FileScanConfigBuilder`:
+
+**Before:**
+
+```rust,ignore
+let source = Arc::new(CsvSource::new(file_schema.clone()));
+let config = FileScanConfigBuilder::new(object_store_url, source)
+    .with_newlines_in_values(true)
+    .build();
+```
+
+**After:**
+
+```rust,ignore
+let options = CsvOptions {
+    newlines_in_values: Some(true),
+    ..Default::default()
+};
+let source = Arc::new(CsvSource::new(file_schema.clone())
+    .with_csv_options(options));
+let config = FileScanConfigBuilder::new(object_store_url, source)
+    .build();
+```
+
 ### Removal of `pyarrow` feature
 
 The `pyarrow` feature flag has been removed. This feature has been migrated to


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to