zhuqi-lucas commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2727580997


##########
datafusion/datasource-json/src/source.rs:
##########
@@ -222,27 +257,69 @@ impl FileOpener for JsonOpener {
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if newline_delimited {
+                        // NDJSON: use BufReader directly
+                        let reader = BufReader::new(bytes);
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    } else {
+                        // JSON array format: wrap with streaming converter
+                        // JsonArrayToNdjsonReader implements BufRead
+                        let ndjson_reader = 
JsonArrayToNdjsonReader::new(bytes);
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(ndjson_reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format from stream: collect bytes first, 
then use streaming converter
+                        // Note: We still need to collect for streams, but the 
converter avoids
+                        // additional memory overhead from serde_json parsing
+                        let bytes = s

Review Comment:
   I changed to streaming in latest PR, thanks @martin-g !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to