martin-g commented on code in PR #19924:
URL: https://github.com/apache/datafusion/pull/19924#discussion_r2724110174


##########
datafusion/datasource-json/src/source.rs:
##########
@@ -222,33 +254,97 @@ impl FileOpener for JsonOpener {
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) reader
+                        let reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(BufReader::new(bytes))?;
+                        Ok(futures::stream::iter(reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    } else {
+                        // JSON array format reader
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(bytes),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format: collect all bytes first
+                        let bytes = s
+                            .map_err(DataFusionError::from)
+                            .try_fold(Vec::new(), |mut acc, chunk| async move {
+                                acc.extend_from_slice(&chunk);
+                                Ok(acc)
+                            })
+                            .await?;
+                        let decompressed = file_compression_type
+                            .convert_read(std::io::Cursor::new(bytes))?;
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(decompressed),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    }
                 }
             }
         }))
     }
 }
 
+/// Read JSON array format and convert to RecordBatches.
+///
+/// Parses a JSON array `[{...}, {...}, ...]` and converts each object
+/// to Arrow RecordBatches using the provided schema.
+fn read_json_array_to_batches<R: Read>(
+    mut reader: R,
+    schema: SchemaRef,
+    batch_size: usize,
+) -> Result<Vec<RecordBatch>> {
+    let mut content = String::new();
+    reader.read_to_string(&mut content)?;
+
+    // Parse JSON array
+    let values: Vec<serde_json::Value> = serde_json::from_str(&content)

Review Comment:
   Actually this method seems to be very memory hungry.
   It should use streaming parsing instead.
   Here is a demo that you could use for inspiration - 
https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=e5a75081c5623eefc8a5eecd07f8e924



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to