This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c7fb0a  fix: fix incorrect splitting with line delimited streaming 
(#700)
8c7fb0a is described below

commit 8c7fb0a1e2fc794e375b205eab26b73d31032e2d
Author: bboissin <[email protected]>
AuthorDate: Tue Jun 2 11:53:17 2026 +0200

    fix: fix incorrect splitting with line delimited streaming (#700)
    
    * fix: fix incorrect splitting with line delimited streaming
    
    In some cases, valid CSV in datafusion would return:
    `Generic { store: "LineDelimiter", source: UnterminatedString }` due to 
incorrect logic.
    
    records_ends is a double ended iterator, so when calling next_back() the
    quoting/escaping logic would run in reverse, corrupting the internal state.
    
    * Use last instead of collecting into vec
    
    * Clippy
    
    ---------
    
    Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
 src/delimited.rs | 36 +++++++++++++++++++++++++++++++++++-
 1 file changed, 35 insertions(+), 1 deletion(-)

diff --git a/src/delimited.rs b/src/delimited.rs
index b9f8842..dbdac39 100644
--- a/src/delimited.rs
+++ b/src/delimited.rs
@@ -109,7 +109,7 @@ impl LineDelimiter {
                 }
             },
         };
-        let end_offset = record_ends.next_back().unwrap_or(start_offset);
+        let end_offset = record_ends.last().unwrap_or(start_offset);
         if start_offset != end_offset {
             self.complete.push_back(val.slice(start_offset..end_offset));
         }
@@ -270,4 +270,38 @@ mod tests {
             ]
         )
     }
+
+    #[tokio::test]
+    async fn test_delimiter_quotes_stream() {
+        let input = vec!["x,y,z\n,\"new\nline\",\"with ", "space\""];
+        let input_stream =
+            futures_util::stream::iter(input.into_iter().map(|s| 
Ok(Bytes::from(s))));
+        let stream = newline_delimited_stream(input_stream);
+
+        let results: Vec<_> = stream.try_collect().await.unwrap();
+        assert_eq!(
+            results,
+            vec![
+                Bytes::from("x,y,z\n"),
+                Bytes::from(",\"new\nline\",\"with space\"")
+            ]
+        )
+    }
+
+    #[tokio::test]
+    async fn test_delimiter_escape_stream() {
+        let input = vec!["hello\n\n\"\\ttabulated\"", "world"];
+        let input_stream =
+            futures_util::stream::iter(input.into_iter().map(|s| 
Ok(Bytes::from(s))));
+        let stream = newline_delimited_stream(input_stream);
+
+        let results: Vec<_> = stream.try_collect().await.unwrap();
+        assert_eq!(
+            results,
+            vec![
+                Bytes::from("hello\n\n"),
+                Bytes::from("\"\\ttabulated\"world")
+            ]
+        )
+    }
 }

Reply via email to