This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch fix/sort-merge-reservation-starvation
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 95225089dfd9e6250acec41573d40183cd7de91c
Author: Qi Zhu <[email protected]>
AuthorDate: Tue Feb 3 14:39:35 2026 +0800

    Redesign  json array streaming for datafusion (#31)
---
 Cargo.lock                                         |   3 +
 Cargo.toml                                         |   1 +
 datafusion-examples/examples/csv_json_opener.rs    |   2 +-
 datafusion/common/src/config.rs                    |  33 +-
 datafusion/core/src/dataframe/mod.rs               |   2 +-
 datafusion/core/src/datasource/file_format/json.rs | 383 +++++------
 .../core/src/datasource/file_format/options.rs     |  65 +-
 datafusion/core/src/datasource/listing/table.rs    |   3 +-
 .../core/src/datasource/physical_plan/json.rs      |  12 +-
 datafusion/core/src/execution/context/json.rs      |  10 +-
 datafusion/core/src/prelude.rs                     |   2 +-
 datafusion/core/tests/dataframe/mod.rs             |   9 +-
 datafusion/datasource-json/Cargo.toml              |   4 +-
 datafusion/datasource-json/src/file_format.rs      | 147 +++--
 datafusion/datasource-json/src/mod.rs              |   1 +
 datafusion/datasource-json/src/source.rs           | 586 +++++++++++++---
 datafusion/datasource-json/src/utils.rs            | 734 +++++++++++++++++++++
 .../proto-common/proto/datafusion_common.proto     |   3 +-
 datafusion/proto-common/src/from_proto/mod.rs      |   3 +-
 datafusion/proto-common/src/generated/pbjson.rs    |  46 +-
 datafusion/proto-common/src/generated/prost.rs     |   9 +-
 datafusion/proto-common/src/to_proto/mod.rs        |   3 +-
 .../proto/src/generated/datafusion_proto_common.rs |   9 +-
 datafusion/proto/src/logical_plan/file_formats.rs  |   6 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    |   4 +-
 datafusion/sqllogictest/test_files/json.slt        |  10 +-
 26 files changed, 1616 insertions(+), 474 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 2cfdd82851..53dd7b5071 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2155,9 +2155,11 @@ dependencies = [
  "datafusion-physical-plan",
  "datafusion-session",
  "futures",
+ "log",
  "object_store",
  "serde_json",
  "tokio",
+ "tokio-stream",
 ]
 
 [[package]]
@@ -6469,6 +6471,7 @@ dependencies = [
  "futures-core",
  "pin-project-lite",
  "tokio",
+ "tokio-util",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index 36198430e4..53fbff1712 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -181,6 +181,7 @@ tempfile = "3"
 testcontainers = { version = "0.25.2", features = ["default"] }
 testcontainers-modules = { version = "0.13" }
 tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
+tokio-stream = "0.1"
 url = "2.5.7"
 
 [workspace.lints.clippy]
diff --git a/datafusion-examples/examples/csv_json_opener.rs 
b/datafusion-examples/examples/csv_json_opener.rs
index e7b809c82e..8a24c2c6e1 100644
--- a/datafusion-examples/examples/csv_json_opener.rs
+++ b/datafusion-examples/examples/csv_json_opener.rs
@@ -121,7 +121,7 @@ async fn json_opener() -> Result<()> {
         projected,
         FileCompressionType::UNCOMPRESSED,
         Arc::new(object_store),
-        false,
+        true,
     );
 
     let scan_config = FileScanConfigBuilder::new(
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 68022c2f06..19254e9436 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -2813,23 +2813,22 @@ config_namespace! {
     pub struct JsonOptions {
         pub compression: CompressionTypeVariant, default = 
CompressionTypeVariant::UNCOMPRESSED
         pub schema_infer_max_rec: Option<usize>, default = None
-        pub compression_level: Option<usize>, default = None
-        /// The format of JSON input files.
-        ///
-        /// When `false` (default), expects newline-delimited JSON (NDJSON):
-        /// ```text
-        /// {"key1": 1, "key2": "val"}
-        /// {"key1": 2, "key2": "vals"}
-        /// ```
-        ///
-        /// When `true`, expects JSON array format:
-        /// ```text
-        /// [
-        ///     {"key1": 1, "key2": "val"},
-        ///     {"key1": 2, "key2": "vals"}
-        /// ]
-        /// ```
-        pub format_array: bool, default = false
+       /// The JSON format to use when reading files.
+       ///
+       /// When `true` (default), expects newline-delimited JSON (NDJSON):
+       /// ```text
+       /// {"key1": 1, "key2": "val"}
+       /// {"key1": 2, "key2": "vals"}
+       /// ```
+       ///
+       /// When `false`, expects JSON array format:
+       /// ```text
+       /// [
+       ///   {"key1": 1, "key2": "val"},
+       ///   {"key1": 2, "key2": "vals"}
+       /// ]
+       /// ```
+       pub newline_delimited: bool, default = true
     }
 }
 
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index aa378d4262..a81a74fffa 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -479,7 +479,7 @@ impl DataFrame {
     /// # #[tokio::main]
     /// # async fn main() -> Result<()> {
     /// let ctx = SessionContext::new();
-    /// let df = ctx.read_json("tests/data/unnest.json", 
NdJsonReadOptions::default()).await?;
+    /// let df = ctx.read_json("tests/data/unnest.json", 
JsonReadOptions::default()).await?;
     /// // expand into multiple columns if it's json array, flatten field name 
if it's nested structure
     /// let df = df.unnest_columns(&["b","c","d"])?;
     /// let expected = vec![
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index cce8ead46c..b53fdad5db 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -25,7 +25,7 @@ mod tests {
     use super::*;
 
     use crate::datasource::file_format::test_util::scan_format;
-    use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext};
+    use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::object_store::local_unpartitioned_file;
     use arrow::array::RecordBatch;
     use arrow_schema::Schema;
@@ -46,6 +46,7 @@ mod tests {
     use datafusion_common::internal_err;
     use datafusion_common::stats::Precision;
 
+    use crate::execution::options::JsonReadOptions;
     use datafusion_common::Result;
     use datafusion_datasource::file_compression_type::FileCompressionType;
     use futures::StreamExt;
@@ -53,6 +54,46 @@ mod tests {
     use object_store::local::LocalFileSystem;
     use regex::Regex;
     use rstest::rstest;
+    // ==================== Test Helpers ====================
+
+    /// Create a temporary JSON file and return (TempDir, path)
+    fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
+        std::fs::write(&path, content).unwrap();
+        (tmp_dir, path)
+    }
+
+    /// Infer schema from JSON array format file
+    async fn infer_json_array_schema(
+        content: &str,
+    ) -> Result<arrow::datatypes::SchemaRef> {
+        let (_tmp_dir, path) = create_temp_json(content);
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let store = Arc::new(LocalFileSystem::new()) as _;
+        let format = JsonFormat::default().with_newline_delimited(false);
+        format
+            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+            .await
+    }
+
+    /// Register a JSON array table and run a query
+    async fn query_json_array(content: &str, query: &str) -> 
Result<Vec<RecordBatch>> {
+        let (_tmp_dir, path) = create_temp_json(content);
+        let ctx = SessionContext::new();
+        let options = JsonReadOptions::default().newline_delimited(false);
+        ctx.register_json("test_table", &path, options).await?;
+        ctx.sql(query).await?.collect().await
+    }
+
+    /// Register a JSON array table and run a query, return formatted string
+    async fn query_json_array_str(content: &str, query: &str) -> 
Result<String> {
+        let result = query_json_array(content, query).await?;
+        Ok(batches_to_string(&result))
+    }
+
+    // ==================== Existing Tests ====================
 
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
@@ -209,7 +250,7 @@ mod tests {
         let ctx = SessionContext::new_with_config(config);
 
         let table_path = "tests/data/1.json";
-        let options = NdJsonReadOptions::default();
+        let options = JsonReadOptions::default();
 
         ctx.register_json("json_parallel", table_path, options)
             .await?;
@@ -241,7 +282,7 @@ mod tests {
         let ctx = SessionContext::new_with_config(config);
 
         let table_path = "tests/data/empty.json";
-        let options = NdJsonReadOptions::default();
+        let options = JsonReadOptions::default();
 
         ctx.register_json("json_parallel_empty", table_path, options)
             .await?;
@@ -315,7 +356,6 @@ mod tests {
             .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 
}"#.into());
 
         let mut all_batches = RecordBatch::new_empty(schema.clone());
-        // We get RequiresMoreData after 2 batches because of how 
json::Decoder works
         for _ in 0..2 {
             let output = deserializer.next()?;
             let DeserializerOutput::RecordBatch(batch) = output else {
@@ -359,7 +399,6 @@ mod tests {
         let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
         df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), 
None)
             .await?;
-        // Expected the file to exist and be empty
         assert!(std::path::Path::new(&path).exists());
         let metadata = std::fs::metadata(&path)?;
         assert_eq!(metadata.len(), 0);
@@ -386,280 +425,210 @@ mod tests {
         let df = ctx.read_batch(empty_batch.clone())?;
         df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), 
None)
             .await?;
-        // Expected the file to exist and be empty
         assert!(std::path::Path::new(&path).exists());
         let metadata = std::fs::metadata(&path)?;
         assert_eq!(metadata.len(), 0);
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_json_array_format() -> Result<()> {
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let store = Arc::new(LocalFileSystem::new()) as _;
-
-        // Create a temporary file with JSON array format
-        let tmp_dir = tempfile::TempDir::new()?;
-        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
-        std::fs::write(
-            &path,
-            r#"[
-                {"a": 1, "b": 2.0, "c": true},
-                {"a": 2, "b": 3.5, "c": false},
-                {"a": 3, "b": 4.0, "c": true}
-            ]"#,
-        )?;
+    // ==================== JSON Array Format Tests ====================
 
-        // Test with format_array = true
-        let format = JsonFormat::default().with_format_array(true);
-        let file_schema = format
-            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
-            .await
-            .expect("Schema inference");
+    #[tokio::test]
+    async fn test_json_array_schema_inference() -> Result<()> {
+        let schema = infer_json_array_schema(
+            r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": 
false}]"#,
+        )
+        .await?;
 
-        let fields = file_schema
+        let fields: Vec<_> = schema
             .fields()
             .iter()
             .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
-            .collect::<Vec<_>>();
+            .collect();
         assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
-
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_json_array_format_empty() -> Result<()> {
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let store = Arc::new(LocalFileSystem::new()) as _;
-
-        let tmp_dir = tempfile::TempDir::new()?;
-        let path = format!("{}/empty_array.json", 
tmp_dir.path().to_string_lossy());
-        std::fs::write(&path, "[]")?;
-
-        let format = JsonFormat::default().with_format_array(true);
-        let result = format
-            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
-            .await;
-
-        assert!(result.is_err());
-        assert!(result
-            .unwrap_err()
-            .to_string()
-            .contains("JSON array is empty"));
-
+    async fn test_json_array_empty() -> Result<()> {
+        let schema = infer_json_array_schema("[]").await?;
+        assert_eq!(schema.fields().len(), 0);
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_json_array_format_with_limit() -> Result<()> {
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let store = Arc::new(LocalFileSystem::new()) as _;
-
-        let tmp_dir = tempfile::TempDir::new()?;
-        let path = format!("{}/array_limit.json", 
tmp_dir.path().to_string_lossy());
-        std::fs::write(
-            &path,
-            r#"[
-                {"a": 1},
-                {"a": 2, "b": "extra"}
-            ]"#,
-        )?;
-
-        // Only infer from first record
-        let format = JsonFormat::default()
-            .with_format_array(true)
-            .with_schema_infer_max_rec(1);
-
-        let file_schema = format
-            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
-            .await
-            .expect("Schema inference");
-
-        // Should only have field "a" since we limited to 1 record
-        let fields = file_schema
-            .fields()
-            .iter()
-            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
-            .collect::<Vec<_>>();
-        assert_eq!(vec!["a: Int64"], fields);
+    async fn test_json_array_nested_struct() -> Result<()> {
+        let schema = infer_json_array_schema(
+            r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#,
+        )
+        .await?;
 
+        let info_field = schema.field_with_name("info").unwrap();
+        assert!(matches!(info_field.data_type(), DataType::Struct(_)));
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_json_array_format_read_data() -> Result<()> {
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let task_ctx = ctx.task_ctx();
-        let store = Arc::new(LocalFileSystem::new()) as _;
-
-        // Create a temporary file with JSON array format
-        let tmp_dir = tempfile::TempDir::new()?;
-        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
-        std::fs::write(
-            &path,
-            r#"[
-            {"a": 1, "b": 2.0, "c": true},
-            {"a": 2, "b": 3.5, "c": false},
-            {"a": 3, "b": 4.0, "c": true}
-        ]"#,
-        )?;
+    async fn test_json_array_list_type() -> Result<()> {
+        let schema =
+            infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", 
"c"]}]"#).await?;
 
-        let format = JsonFormat::default().with_format_array(true);
-
-        // Infer schema
-        let file_schema = format
-            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
-            .await?;
+        let tags_field = schema.field_with_name("tags").unwrap();
+        assert!(matches!(tags_field.data_type(), DataType::List(_)));
+        Ok(())
+    }
 
-        // Scan and read data
-        let exec = scan_format(
-            &ctx,
-            &format,
-            Some(file_schema),
-            tmp_dir.path().to_str().unwrap(),
-            "array.json",
-            None,
-            None,
+    #[tokio::test]
+    async fn test_json_array_basic_query() -> Result<()> {
+        let result = query_json_array_str(
+            r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": 
"test"}]"#,
+            "SELECT a, b FROM test_table ORDER BY a",
         )
         .await?;
-        let batches = collect(exec, task_ctx).await?;
-
-        assert_eq!(1, batches.len());
-        assert_eq!(3, batches[0].num_columns());
-        assert_eq!(3, batches[0].num_rows());
-
-        // Verify data
-        let array_a = as_int64_array(batches[0].column(0))?;
-        assert_eq!(
-            vec![1, 2, 3],
-            (0..3).map(|i| array_a.value(i)).collect::<Vec<_>>()
-        );
 
+        assert_snapshot!(result, @r"
+        +---+-------+
+        | a | b     |
+        +---+-------+
+        | 1 | hello |
+        | 2 | world |
+        | 3 | test  |
+        +---+-------+
+        ");
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_json_array_format_with_projection() -> Result<()> {
-        let session = SessionContext::new();
-        let ctx = session.state();
-        let task_ctx = ctx.task_ctx();
-        let store = Arc::new(LocalFileSystem::new()) as _;
-
-        let tmp_dir = tempfile::TempDir::new()?;
-        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
-        std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": 
"world"}]"#)?;
-
-        let format = JsonFormat::default().with_format_array(true);
-        let file_schema = format
-            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+    async fn test_json_array_with_nulls() -> Result<()> {
+        let result = query_json_array_str(
+            r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, 
"name": "Charlie"}]"#,
+            "SELECT id, name FROM test_table ORDER BY id",
+        )
             .await?;
 
-        // Project only column "a"
-        let exec = scan_format(
-            &ctx,
-            &format,
-            Some(file_schema),
-            tmp_dir.path().to_str().unwrap(),
-            "array.json",
-            Some(vec![0]),
-            None,
+        assert_snapshot!(result, @r"
+        +----+---------+
+        | id | name    |
+        +----+---------+
+        | 1  | Alice   |
+        | 2  |         |
+        | 3  | Charlie |
+        +----+---------+
+        ");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_unnest() -> Result<()> {
+        let result = query_json_array_str(
+            r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 
50]}]"#,
+            "SELECT id, unnest(values) as value FROM test_table ORDER BY id, 
value",
         )
         .await?;
-        let batches = collect(exec, task_ctx).await?;
-
-        assert_eq!(1, batches.len());
-        assert_eq!(1, batches[0].num_columns()); // Only 1 column projected
-        assert_eq!(2, batches[0].num_rows());
 
+        assert_snapshot!(result, @r"
+        +----+-------+
+        | id | value |
+        +----+-------+
+        | 1  | 10    |
+        | 1  | 20    |
+        | 1  | 30    |
+        | 2  | 40    |
+        | 2  | 50    |
+        +----+-------+
+        ");
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_ndjson_read_options_format_array() -> Result<()> {
-        let ctx = SessionContext::new();
-
-        // Create a temporary file with JSON array format
-        let tmp_dir = tempfile::TempDir::new()?;
-        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
-        std::fs::write(
-            &path,
-            r#"[
-            {"a": 1, "b": "hello"},
-            {"a": 2, "b": "world"},
-            {"a": 3, "b": "test"}
-        ]"#,
-        )?;
-
-        // Use NdJsonReadOptions with format_array = true
-        let options = NdJsonReadOptions::default().format_array(true);
-
-        ctx.register_json("json_array_table", &path, options)
+    async fn test_json_array_unnest_struct() -> Result<()> {
+        let result = query_json_array_str(
+            r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": 
"B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#,
+            "SELECT id, unnest(orders)['product'] as product, 
unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product",
+        )
             .await?;
 
-        let result = ctx
-            .sql("SELECT a, b FROM json_array_table ORDER BY a")
-            .await?
-            .collect()
-            .await?;
+        assert_snapshot!(result, @r"
+        +----+---------+-----+
+        | id | product | qty |
+        +----+---------+-----+
+        | 1  | A       | 2   |
+        | 1  | B       | 3   |
+        | 2  | C       | 1   |
+        +----+---------+-----+
+        ");
+        Ok(())
+    }
 
-        assert_snapshot!(batches_to_string(&result), @r"
-    +---+-------+
-    | a | b     |
-    +---+-------+
-    | 1 | hello |
-    | 2 | world |
-    | 3 | test  |
-    +---+-------+
-    ");
+    #[tokio::test]
+    async fn test_json_array_nested_struct_access() -> Result<()> {
+        let result = query_json_array_str(
+            r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, 
{"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#,
+            "SELECT id, dept['name'] as dept_name, dept['head'] as head FROM 
test_table ORDER BY id",
+        )
+            .await?;
 
+        assert_snapshot!(result, @r"
+        +----+-------------+-------+
+        | id | dept_name   | head  |
+        +----+-------------+-------+
+        | 1  | Engineering | Alice |
+        | 2  | Sales       | Bob   |
+        +----+-------------+-------+
+        ");
         Ok(())
     }
 
     #[tokio::test]
-    async fn test_ndjson_read_options_format_array_with_compression() -> 
Result<()> {
+    async fn test_json_array_with_compression() -> Result<()> {
         use flate2::write::GzEncoder;
         use flate2::Compression;
         use std::io::Write;
 
-        let ctx = SessionContext::new();
-
-        // Create a temporary gzip compressed JSON array file
         let tmp_dir = tempfile::TempDir::new()?;
         let path = format!("{}/array.json.gz", 
tmp_dir.path().to_string_lossy());
 
-        let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": 
"world"}]"#;
         let file = std::fs::File::create(&path)?;
         let mut encoder = GzEncoder::new(file, Compression::default());
-        encoder.write_all(json_content.as_bytes())?;
+        encoder.write_all(
+            r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(),
+        )?;
         encoder.finish()?;
 
-        // Use NdJsonReadOptions with format_array and GZIP compression
-        let options = NdJsonReadOptions::default()
-            .format_array(true)
+        let ctx = SessionContext::new();
+        let options = JsonReadOptions::default()
+            .newline_delimited(false)
             .file_compression_type(FileCompressionType::GZIP)
             .file_extension(".json.gz");
 
-        ctx.register_json("json_array_gzip", &path, options).await?;
-
+        ctx.register_json("test_table", &path, options).await?;
         let result = ctx
-            .sql("SELECT a, b FROM json_array_gzip ORDER BY a")
+            .sql("SELECT a, b FROM test_table ORDER BY a")
             .await?
             .collect()
             .await?;
 
         assert_snapshot!(batches_to_string(&result), @r"
-    +---+-------+
-    | a | b     |
-    +---+-------+
-    | 1 | hello |
-    | 2 | world |
-    +---+-------+
-    ");
+        +---+-------+
+        | a | b     |
+        +---+-------+
+        | 1 | hello |
+        | 2 | world |
+        +---+-------+
+        ");
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_list_of_structs() -> Result<()> {
+        let batches = query_json_array(
+            r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, 
"items": []}]"#,
+            "SELECT id, items FROM test_table ORDER BY id",
+        )
+            .await?;
 
+        assert_eq!(1, batches.len());
+        assert_eq!(2, batches[0].num_rows());
         Ok(())
     }
 }
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index fb707ed3d6..7c9777d014 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -442,14 +442,23 @@ impl<'a> AvroReadOptions<'a> {
     }
 }
 
-/// Options that control the reading of Line-delimited JSON files (NDJson)
+#[deprecated(
+    since = "53.0.0",
+    note = "Use `JsonReadOptions` instead. This alias will be removed in a 
future version."
+)]
+#[doc = "Deprecated: Use [`JsonReadOptions`] instead."]
+pub type NdJsonReadOptions<'a> = JsonReadOptions<'a>;
+
+/// Options that control the reading of JSON files.
+///
+/// Supports both newline-delimited JSON (NDJSON) and JSON array formats.
 ///
 /// Note this structure is supplied when a datasource is created and
-/// can not not vary from statement to statement. For settings that
+/// can not vary from statement to statement. For settings that
 /// can vary statement to statement see
 /// [`ConfigOptions`](crate::config::ConfigOptions).
 #[derive(Clone)]
-pub struct NdJsonReadOptions<'a> {
+pub struct JsonReadOptions<'a> {
     /// The data source schema.
     pub schema: Option<&'a Schema>,
     /// Max number of rows to read from JSON files for schema inference if 
needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
@@ -465,12 +474,25 @@ pub struct NdJsonReadOptions<'a> {
     pub infinite: bool,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<SortExpr>>,
-    /// Whether the JSON file is in array format `[{...}, {...}]` instead of
-    /// line-delimited format. Defaults to `false`.
-    pub format_array: bool,
+    /// Whether to read as newline-delimited JSON (default: true).
+    ///
+    /// When `true` (default), expects newline-delimited JSON (NDJSON):
+    /// ```text
+    /// {"key1": 1, "key2": "val"}
+    /// {"key1": 2, "key2": "vals"}
+    /// ```
+    ///
+    /// When `false`, expects JSON array format:
+    /// ```text
+    /// [
+    ///   {"key1": 1, "key2": "val"},
+    ///   {"key1": 2, "key2": "vals"}
+    /// ]
+    /// ```
+    pub newline_delimited: bool,
 }
 
-impl Default for NdJsonReadOptions<'_> {
+impl Default for JsonReadOptions<'_> {
     fn default() -> Self {
         Self {
             schema: None,
@@ -480,12 +502,12 @@ impl Default for NdJsonReadOptions<'_> {
             file_compression_type: FileCompressionType::UNCOMPRESSED,
             infinite: false,
             file_sort_order: vec![],
-            format_array: false,
+            newline_delimited: true,
         }
     }
 }
 
-impl<'a> NdJsonReadOptions<'a> {
+impl<'a> JsonReadOptions<'a> {
     /// Specify table_partition_cols for partition pruning
     pub fn table_partition_cols(
         mut self,
@@ -534,10 +556,23 @@ impl<'a> NdJsonReadOptions<'a> {
         self
     }
 
-    /// Specify whether the JSON file is in array format `[{...}, {...}]`
-    /// instead of line-delimited format.
-    pub fn format_array(mut self, format_array: bool) -> Self {
-        self.format_array = format_array;
+    /// Set whether to read as newline-delimited JSON.
+    ///
+    /// When `true` (default), expects newline-delimited JSON (NDJSON):
+    /// ```text
+    /// {"key1": 1, "key2": "val"}
+    /// {"key1": 2, "key2": "vals"}
+    /// ```
+    ///
+    /// When `false`, expects JSON array format:
+    /// ```text
+    /// [
+    ///   {"key1": 1, "key2": "val"},
+    ///   {"key1": 2, "key2": "vals"}
+    /// ]
+    /// ```
+    pub fn newline_delimited(mut self, newline_delimited: bool) -> Self {
+        self.newline_delimited = newline_delimited;
         self
     }
 }
@@ -665,7 +700,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
 }
 
 #[async_trait]
-impl ReadOptions<'_> for NdJsonReadOptions<'_> {
+impl ReadOptions<'_> for JsonReadOptions<'_> {
     fn to_listing_options(
         &self,
         config: &SessionConfig,
@@ -675,7 +710,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
             .with_options(table_options.json)
             .with_schema_infer_max_rec(self.schema_infer_max_records)
             .with_file_compression_type(self.file_compression_type.to_owned())
-            .with_format_array(self.format_array);
+            .with_newline_delimited(self.newline_delimited);
 
         ListingOptions::new(Arc::new(file_format))
             .with_file_extension(self.file_extension)
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 3333b70676..06af40f821 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -110,6 +110,7 @@ mod tests {
     #[cfg(feature = "parquet")]
     use crate::datasource::file_format::parquet::ParquetFormat;
     use crate::datasource::listing::table::ListingTableConfigExt;
+    use crate::execution::options::JsonReadOptions;
     use crate::prelude::*;
     use crate::{
         datasource::{
@@ -806,7 +807,7 @@ mod tests {
                     .register_json(
                         "t",
                         tmp_dir.path().to_str().unwrap(),
-                        NdJsonReadOptions::default()
+                        JsonReadOptions::default()
                             .schema(schema.as_ref())
                             .file_compression_type(file_compression_type),
                     )
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs 
b/datafusion/core/src/datasource/physical_plan/json.rs
index f7d5c710bf..6701bab332 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -32,7 +32,7 @@ mod tests {
 
     use crate::dataframe::DataFrameWriteOptions;
     use crate::execution::SessionState;
-    use crate::prelude::{CsvReadOptions, NdJsonReadOptions, SessionContext};
+    use crate::prelude::{CsvReadOptions, JsonReadOptions, SessionContext};
     use crate::test::partitioned_file_groups;
     use datafusion_common::cast::{as_int32_array, as_int64_array, 
as_string_array};
     use datafusion_common::test_util::batches_to_string;
@@ -132,7 +132,7 @@ mod tests {
             .get_ext_with_compression(&file_compression_type)
             .unwrap();
 
-        let read_options = NdJsonReadOptions::default()
+        let read_options = JsonReadOptions::default()
             .file_extension(ext.as_str())
             .file_compression_type(file_compression_type.to_owned());
         let frame = ctx.read_json(path, read_options).await.unwrap();
@@ -384,7 +384,7 @@ mod tests {
         let path = format!("{TEST_DATA_BASE}/1.json");
 
         // register json file with the execution context
-        ctx.register_json("test", path.as_str(), NdJsonReadOptions::default())
+        ctx.register_json("test", path.as_str(), JsonReadOptions::default())
             .await?;
 
         // register a local file system object store for /tmp directory
@@ -426,7 +426,7 @@ mod tests {
         }
 
         // register each partition as well as the top level dir
-        let json_read_option = NdJsonReadOptions::default();
+        let json_read_option = JsonReadOptions::default();
         ctx.register_json(
             "part0",
             &format!("{out_dir}/{part_0_name}"),
@@ -503,7 +503,7 @@ mod tests {
         async fn read_test_data(schema_infer_max_records: usize) -> 
Result<SchemaRef> {
             let ctx = SessionContext::new();
 
-            let options = NdJsonReadOptions {
+            let options = JsonReadOptions {
                 schema_infer_max_records,
                 ..Default::default()
             };
@@ -579,7 +579,7 @@ mod tests {
             .get_ext_with_compression(&file_compression_type)
             .unwrap();
 
-        let read_option = NdJsonReadOptions::default()
+        let read_option = JsonReadOptions::default()
             .file_compression_type(file_compression_type)
             .file_extension(ext.as_str());
 
diff --git a/datafusion/core/src/execution/context/json.rs 
b/datafusion/core/src/execution/context/json.rs
index e9d7994008..f7df2ad7a1 100644
--- a/datafusion/core/src/execution/context/json.rs
+++ b/datafusion/core/src/execution/context/json.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use super::super::options::ReadOptions;
+use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
+use crate::execution::options::JsonReadOptions;
 use datafusion_common::TableReference;
 use datafusion_datasource_json::source::plan_to_json;
 use std::sync::Arc;
 
-use super::super::options::{NdJsonReadOptions, ReadOptions};
-use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
-
 impl SessionContext {
     /// Creates a [`DataFrame`] for reading an JSON data source.
     ///
@@ -32,7 +32,7 @@ impl SessionContext {
     pub async fn read_json<P: DataFilePaths>(
         &self,
         table_paths: P,
-        options: NdJsonReadOptions<'_>,
+        options: JsonReadOptions<'_>,
     ) -> Result<DataFrame> {
         self._read_type(table_paths, options).await
     }
@@ -43,7 +43,7 @@ impl SessionContext {
         &self,
         table_ref: impl Into<TableReference>,
         table_path: impl AsRef<str>,
-        options: NdJsonReadOptions<'_>,
+        options: JsonReadOptions<'_>,
     ) -> Result<()> {
         let listing_options = options
             .to_listing_options(&self.copied_config(), 
self.copied_table_options());
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index d723620d32..a94716aed7 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -29,7 +29,7 @@ pub use crate::dataframe;
 pub use crate::dataframe::DataFrame;
 pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext};
 pub use crate::execution::options::{
-    AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
+    AvroReadOptions, CsvReadOptions, JsonReadOptions, ParquetReadOptions,
 };
 
 pub use datafusion_common::Column;
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 4d52345a2a..e4d8f10d71 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -56,9 +56,7 @@ use datafusion::error::Result;
 use datafusion::execution::context::SessionContext;
 use datafusion::execution::session_state::SessionStateBuilder;
 use datafusion::logical_expr::{ColumnarValue, Volatility};
-use datafusion::prelude::{
-    CsvReadOptions, JoinType, NdJsonReadOptions, ParquetReadOptions,
-};
+use datafusion::prelude::{CsvReadOptions, JoinType, ParquetReadOptions};
 use datafusion::test_util::{
     parquet_test_data, populate_csv_partitions, register_aggregate_csv, 
test_table,
     test_table_with_name,
@@ -93,6 +91,7 @@ use datafusion_physical_plan::empty::EmptyExec;
 use datafusion_physical_plan::{displayable, ExecutionPlan, 
ExecutionPlanProperties};
 
 use datafusion::error::Result as DataFusionResult;
+use datafusion::execution::options::JsonReadOptions;
 use datafusion_functions_window::expr_fn::lag;
 
 // Get string representation of the plan
@@ -2767,7 +2766,7 @@ async fn write_json_with_order() -> Result<()> {
     ctx.register_json(
         "data",
         test_path.to_str().unwrap(),
-        NdJsonReadOptions::default().schema(&schema),
+        JsonReadOptions::default().schema(&schema),
     )
     .await?;
 
@@ -6240,7 +6239,7 @@ async fn register_non_json_file() {
         .register_json(
             "data",
             "tests/data/test_binary.parquet",
-            NdJsonReadOptions::default(),
+            JsonReadOptions::default(),
         )
         .await;
     assert_contains!(
diff --git a/datafusion/datasource-json/Cargo.toml 
b/datafusion/datasource-json/Cargo.toml
index 168ae8880e..480b4fd7ba 100644
--- a/datafusion/datasource-json/Cargo.toml
+++ b/datafusion/datasource-json/Cargo.toml
@@ -43,9 +43,11 @@ datafusion-physical-expr-common = { workspace = true }
 datafusion-physical-plan = { workspace = true }
 datafusion-session = { workspace = true }
 futures = { workspace = true }
+log = "0.4.28"
 object_store = { workspace = true }
-serde_json = { workspace = true }
+serde_json = "1.0.145"
 tokio = { workspace = true }
+tokio-stream = { workspace = true, features = ["sync"] }
 
 # Note: add additional linter rules in lib.rs.
 # Rust does not support workspace + new linter rules in subcrates yet
diff --git a/datafusion/datasource-json/src/file_format.rs 
b/datafusion/datasource-json/src/file_format.rs
index 589ad6bea9..b2a3dca9f2 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -31,6 +31,7 @@ use arrow::datatypes::{Schema, SchemaRef};
 use arrow::error::ArrowError;
 use arrow::json;
 use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
+use bytes::{Buf, Bytes};
 use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions};
 use datafusion_common::file_options::json_writer::JsonWriterOptions;
 use datafusion_common::{
@@ -47,7 +48,6 @@ use datafusion_datasource::file_format::{
 use datafusion_datasource::file_scan_config::{FileScanConfig, 
FileScanConfigBuilder};
 use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
 use datafusion_datasource::sink::{DataSink, DataSinkExec};
-
 use datafusion_datasource::source::DataSourceExec;
 use datafusion_datasource::write::demux::DemuxedStreamReceiver;
 use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
@@ -58,8 +58,8 @@ use 
datafusion_physical_expr_common::sort_expr::LexRequirement;
 use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
 use datafusion_session::Session;
 
+use crate::utils::JsonArrayToNdjsonReader;
 use async_trait::async_trait;
-use bytes::{Buf, Bytes};
 use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
 
 #[derive(Default)]
@@ -136,19 +136,22 @@ impl Debug for JsonFormatFactory {
 ///
 /// # Supported Formats
 ///
-/// ## Line-Delimited JSON (default)
+/// ## Line-Delimited JSON (default, `newline_delimited = true`)
 /// ```text
 /// {"key1": 1, "key2": "val"}
 /// {"key1": 2, "key2": "vals"}
 /// ```
 ///
-/// ## JSON Array Format (when `format_array` option is true)
+/// ## JSON Array Format (`newline_delimited = false`)
 /// ```text
 /// [
 ///     {"key1": 1, "key2": "val"},
 ///     {"key1": 2, "key2": "vals"}
 /// ]
 /// ```
+///
+/// Note: JSON array format is processed using streaming conversion,
+/// which is memory-efficient even for large files.
 #[derive(Debug, Default)]
 pub struct JsonFormat {
     options: JsonOptions,
@@ -183,48 +186,51 @@ impl JsonFormat {
         self
     }
 
-    /// Set whether to expect JSON array format instead of line-delimited 
format.
+    /// Set whether to read as newline-delimited JSON (NDJSON).
     ///
-    /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]`
-    /// When `false` (default), expects input like:
+    /// When `true` (default), expects newline-delimited format:
     /// ```text
     /// {"a": 1}
     /// {"a": 2}
     /// ```
-    pub fn with_format_array(mut self, format_array: bool) -> Self {
-        self.options.format_array = format_array;
+    ///
+    /// When `false`, expects JSON array format:
+    /// ```text
+    /// [{"a": 1}, {"a": 2}]
+    /// ```
+    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
+        self.options.newline_delimited = newline_delimited;
         self
     }
 }
 
-/// Infer schema from a JSON array format file.
+/// Infer schema from JSON array format using streaming conversion.
+///
+/// This function converts JSON array format to NDJSON on-the-fly and uses
+/// arrow-json's schema inference. It properly tracks the number of records
+/// processed for correct `records_to_read` management.
 ///
-/// This function reads JSON data in array format `[{...}, {...}]` and infers
-/// the Arrow schema from the contained objects.
-fn infer_json_schema_from_json_array<R: Read>(
-    reader: &mut R,
+/// # Returns
+/// A tuple of (Schema, records_consumed) where records_consumed is the
+/// number of records that were processed for schema inference.
+fn infer_schema_from_json_array<R: Read>(
+    reader: R,
     max_records: usize,
-) -> std::result::Result<Schema, ArrowError> {
-    let mut content = String::new();
-    reader.read_to_string(&mut content).map_err(|e| {
-        ArrowError::JsonError(format!("Failed to read JSON content: {e}"))
-    })?;
-
-    // Parse as JSON array using serde_json
-    let values: Vec<serde_json::Value> = serde_json::from_str(&content)
-        .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON 
array: {e}")))?;
+) -> Result<(Schema, usize)> {
+    let ndjson_reader = JsonArrayToNdjsonReader::new(reader);
 
-    // Take only max_records for schema inference
-    let values_to_infer: Vec<_> = 
values.into_iter().take(max_records).collect();
+    let iter = ValueIter::new(ndjson_reader, None);
+    let mut count = 0;
 
-    if values_to_infer.is_empty() {
-        return Err(ArrowError::JsonError(
-            "JSON array is empty, cannot infer schema".to_string(),
-        ));
-    }
+    let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
+        let should_take = count < max_records;
+        if should_take {
+            count += 1;
+        }
+        should_take
+    }))?;
 
-    // Use arrow's schema inference on the parsed values
-    infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok))
+    Ok((schema, count))
 }
 
 #[async_trait]
@@ -261,53 +267,67 @@ impl FileFormat for JsonFormat {
             .schema_infer_max_rec
             .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
         let file_compression_type = 
FileCompressionType::from(self.options.compression);
-        let is_array_format = self.options.format_array;
+        let newline_delimited = self.options.newline_delimited;
 
         for object in objects {
-            let mut take_while = || {
-                let should_take = records_to_read > 0;
-                if should_take {
-                    records_to_read -= 1;
-                }
-                should_take
-            };
+            // Early exit if we've read enough records
+            if records_to_read == 0 {
+                break;
+            }
 
             let r = store.as_ref().get(&object.location).await?;
-            let schema = match r.payload {
+
+            let (schema, records_consumed) = match r.payload {
                 #[cfg(not(target_arch = "wasm32"))]
                 GetResultPayload::File(file, _) => {
                     let decoder = file_compression_type.convert_read(file)?;
-                    let mut reader = BufReader::new(decoder);
-
-                    if is_array_format {
-                        infer_json_schema_from_json_array(&mut reader, 
records_to_read)?
+                    let reader = BufReader::new(decoder);
+
+                    if newline_delimited {
+                        // NDJSON: use ValueIter directly
+                        let iter = ValueIter::new(reader, None);
+                        let mut count = 0;
+                        let schema =
+                            
infer_json_schema_from_iterator(iter.take_while(|_| {
+                                let should_take = count < records_to_read;
+                                if should_take {
+                                    count += 1;
+                                }
+                                should_take
+                            }))?;
+                        (schema, count)
                     } else {
-                        let iter = ValueIter::new(&mut reader, None);
-                        infer_json_schema_from_iterator(
-                            iter.take_while(|_| take_while()),
-                        )?
+                        // JSON array format: use streaming converter
+                        infer_schema_from_json_array(reader, records_to_read)?
                     }
                 }
                 GetResultPayload::Stream(_) => {
                     let data = r.bytes().await?;
                     let decoder = 
file_compression_type.convert_read(data.reader())?;
-                    let mut reader = BufReader::new(decoder);
-
-                    if is_array_format {
-                        infer_json_schema_from_json_array(&mut reader, 
records_to_read)?
+                    let reader = BufReader::new(decoder);
+
+                    if newline_delimited {
+                        let iter = ValueIter::new(reader, None);
+                        let mut count = 0;
+                        let schema =
+                            
infer_json_schema_from_iterator(iter.take_while(|_| {
+                                let should_take = count < records_to_read;
+                                if should_take {
+                                    count += 1;
+                                }
+                                should_take
+                            }))?;
+                        (schema, count)
                     } else {
-                        let iter = ValueIter::new(&mut reader, None);
-                        infer_json_schema_from_iterator(
-                            iter.take_while(|_| take_while()),
-                        )?
+                        // JSON array format: use streaming converter
+                        infer_schema_from_json_array(reader, records_to_read)?
                     }
                 }
             };
 
             schemas.push(schema);
-            if records_to_read == 0 {
-                break;
-            }
+            // Correctly decrement records_to_read
+            records_to_read = records_to_read.saturating_sub(records_consumed);
         }
 
         let schema = Schema::try_merge(schemas)?;
@@ -329,8 +349,9 @@ impl FileFormat for JsonFormat {
         _state: &dyn Session,
         conf: FileScanConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let source =
-            
Arc::new(JsonSource::new().with_format_array(self.options.format_array));
+        let source = Arc::new(
+            
JsonSource::new().with_newline_delimited(self.options.newline_delimited),
+        );
         let conf = FileScanConfigBuilder::from(conf)
             .with_file_compression_type(FileCompressionType::from(
                 self.options.compression,
@@ -359,7 +380,7 @@ impl FileFormat for JsonFormat {
     }
 
     fn file_source(&self) -> Arc<dyn FileSource> {
-        
Arc::new(JsonSource::new().with_format_array(self.options.format_array))
+        
Arc::new(JsonSource::new().with_newline_delimited(self.options.newline_delimited))
     }
 }
 
diff --git a/datafusion/datasource-json/src/mod.rs 
b/datafusion/datasource-json/src/mod.rs
index 18bb8792c3..549393bb39 100644
--- a/datafusion/datasource-json/src/mod.rs
+++ b/datafusion/datasource-json/src/mod.rs
@@ -21,5 +21,6 @@
 
 pub mod file_format;
 pub mod source;
+pub mod utils;
 
 pub use file_format::*;
diff --git a/datafusion/datasource-json/src/source.rs 
b/datafusion/datasource-json/src/source.rs
index d1107b0b97..934d62ee39 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -15,17 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Execution plan for reading line-delimited JSON files
+//! Execution plan for reading JSON files (line-delimited and array formats)
 
 use std::any::Any;
 use std::io::{BufReader, Read, Seek, SeekFrom};
+use std::pin::Pin;
 use std::sync::Arc;
-use std::task::Poll;
+use std::task::{Context, Poll};
 
 use crate::file_format::JsonDecoder;
+use crate::utils::{ChannelReader, JsonArrayToNdjsonReader};
 
 use datafusion_common::error::{DataFusionError, Result};
-use datafusion_common_runtime::JoinSet;
+use datafusion_common_runtime::{JoinSet, SpawnedTask};
 use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
 use datafusion_datasource::file_compression_type::FileCompressionType;
 use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
@@ -36,19 +38,65 @@ use datafusion_datasource::{
 };
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
-use arrow::array::RecordBatch;
 use arrow::json::ReaderBuilder;
-use arrow::{datatypes::SchemaRef, json};
+use arrow::{datatypes::SchemaRef, json, record_batch::RecordBatch};
 use datafusion_common::Statistics;
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_scan_config::FileScanConfig;
 use datafusion_execution::TaskContext;
 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
 
+use futures::stream::Stream;
 use futures::{StreamExt, TryStreamExt};
 use object_store::buffered::BufWriter;
 use object_store::{GetOptions, GetResultPayload, ObjectStore};
 use tokio::io::AsyncWriteExt;
+use tokio_stream::wrappers::ReceiverStream;
+
+/// Channel buffer size for streaming JSON array processing.
+/// With ~128KB average chunk size, 128 chunks ≈ 16MB buffer.
+const CHANNEL_BUFFER_SIZE: usize = 128;
+
+/// Buffer size for JsonArrayToNdjsonReader (2MB each, 4MB total for 
input+output)
+const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;
+
+// ============================================================================
+// JsonArrayStream - Custom stream wrapper to hold SpawnedTask handles
+// ============================================================================
+
+/// A stream wrapper that holds SpawnedTask handles to keep them alive
+/// until the stream is fully consumed or dropped.
+///
+/// This ensures cancel-safety: when the stream is dropped, the tasks
+/// are properly aborted via SpawnedTask's Drop implementation.
+struct JsonArrayStream {
+    inner: ReceiverStream<std::result::Result<RecordBatch, 
arrow::error::ArrowError>>,
+    /// Task that reads from object store and sends bytes to channel.
+    /// Kept alive until stream is consumed or dropped.
+    _read_task: SpawnedTask<()>,
+    /// Task that parses JSON and sends RecordBatches.
+    /// Kept alive until stream is consumed or dropped.
+    _parse_task: SpawnedTask<()>,
+}
+
+impl Stream for JsonArrayStream {
+    type Item = std::result::Result<RecordBatch, arrow::error::ArrowError>;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        Pin::new(&mut self.inner).poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+// ============================================================================
+// JsonOpener and JsonSource
+// ============================================================================
 
 /// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`]
 pub struct JsonOpener {
@@ -56,24 +104,26 @@ pub struct JsonOpener {
     projected_schema: SchemaRef,
     file_compression_type: FileCompressionType,
     object_store: Arc<dyn ObjectStore>,
-    format_array: bool,
+    /// When `true` (default), expects newline-delimited JSON (NDJSON).
+    /// When `false`, expects JSON array format `[{...}, {...}]`.
+    newline_delimited: bool,
 }
 
 impl JsonOpener {
-    /// Returns a  [`JsonOpener`]
+    /// Returns a [`JsonOpener`]
     pub fn new(
         batch_size: usize,
         projected_schema: SchemaRef,
         file_compression_type: FileCompressionType,
         object_store: Arc<dyn ObjectStore>,
-        format_array: bool,
+        newline_delimited: bool,
     ) -> Self {
         Self {
             batch_size,
             projected_schema,
             file_compression_type,
             object_store,
-            format_array,
+            newline_delimited,
         }
     }
 }
@@ -85,18 +135,27 @@ pub struct JsonSource {
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
-    format_array: bool,
+    newline_delimited: bool,
 }
 
 impl JsonSource {
-    /// Initialize a JsonSource with default values
+    /// Initialize a JsonSource with the provided schema
     pub fn new() -> Self {
-        Self::default()
+        Self {
+            batch_size: None,
+            metrics: ExecutionPlanMetricsSet::new(),
+            projected_statistics: None,
+            schema_adapter_factory: None,
+            newline_delimited: true,
+        }
     }
 
-    /// Set whether to expect JSON array format
-    pub fn with_format_array(mut self, format_array: bool) -> Self {
-        self.format_array = format_array;
+    /// Set whether to read as newline-delimited JSON.
+    ///
+    /// When `true` (default), expects newline-delimited format.
+    /// When `false`, expects JSON array format `[{...}, {...}]`.
+    pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self {
+        self.newline_delimited = newline_delimited;
         self
     }
 }
@@ -121,8 +180,8 @@ impl FileSource for JsonSource {
             projected_schema: base_config.projected_file_schema(),
             file_compression_type: base_config.file_compression_type,
             object_store,
-            format_array: self.format_array,
-        })
+            newline_delimited: self.newline_delimited,
+        }) as Arc<dyn FileOpener>
     }
 
     fn as_any(&self) -> &dyn Any {
@@ -138,16 +197,17 @@ impl FileSource for JsonSource {
     fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
         Arc::new(Self { ..self.clone() })
     }
+
+    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
+        Arc::new(Self { ..self.clone() })
+    }
+
     fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
         let mut conf = self.clone();
         conf.projected_statistics = Some(statistics);
         Arc::new(conf)
     }
 
-    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> 
{
-        Arc::new(Self { ..self.clone() })
-    }
-
     fn metrics(&self) -> &ExecutionPlanMetricsSet {
         &self.metrics
     }
@@ -179,7 +239,7 @@ impl FileSource for JsonSource {
 }
 
 impl FileOpener for JsonOpener {
-    /// Open a partitioned NDJSON file.
+    /// Open a partitioned JSON file.
     ///
     /// If `file_meta.range` is `None`, the entire file is opened.
     /// Else `file_meta.range` is `Some(FileRange{start, end})`, which 
corresponds to the byte range [start, end) within the file.
@@ -188,18 +248,20 @@ impl FileOpener for JsonOpener {
     /// are applied to determine which lines to read:
     /// 1. The first line of the partition is the line in which the index of 
the first character >= `start`.
     /// 2. The last line of the partition is the line in which the byte at 
position `end - 1` resides.
+    ///
+    /// Note: JSON array format does not support range-based scanning.
     fn open(&self, partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture> {
         let store = Arc::clone(&self.object_store);
         let schema = Arc::clone(&self.projected_schema);
         let batch_size = self.batch_size;
         let file_compression_type = self.file_compression_type.to_owned();
-        let format_array = self.format_array;
+        let newline_delimited = self.newline_delimited;
 
         // JSON array format requires reading the complete file
-        if format_array && partitioned_file.range.is_some() {
+        if !newline_delimited && partitioned_file.range.is_some() {
             return Err(DataFusionError::NotImplemented(
                 "JSON array format does not support range-based file scanning. 
\
-                 Disable repartition_file_scans or use line-delimited JSON 
format."
+                 Disable repartition_file_scans or use newline-delimited JSON 
format."
                     .to_string(),
             ));
         }
@@ -235,46 +297,38 @@ impl FileOpener for JsonOpener {
                         Some(_) => {
                             file.seek(SeekFrom::Start(result.range.start as 
_))?;
                             let limit = result.range.end - result.range.start;
-                            file_compression_type.convert_read(file.take(limit 
as u64))?
+                            
file_compression_type.convert_read(file.take(limit))?
                         }
                     };
 
-                    if format_array {
-                        // Handle JSON array format
-                        let batches = read_json_array_to_batches(
-                            BufReader::new(bytes),
-                            schema,
-                            batch_size,
-                        )?;
-                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    if newline_delimited {
+                        // NDJSON: use BufReader directly
+                        let reader = BufReader::new(bytes);
+                        let arrow_reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
                     } else {
-                        let reader = ReaderBuilder::new(schema)
+                        // JSON array format: wrap with streaming converter
+                        let ndjson_reader = 
JsonArrayToNdjsonReader::with_capacity(
+                            bytes,
+                            JSON_CONVERTER_BUFFER_SIZE,
+                        );
+                        let arrow_reader = ReaderBuilder::new(schema)
                             .with_batch_size(batch_size)
-                            .build(BufReader::new(bytes))?;
-                        Ok(futures::stream::iter(reader)
+                            .build(ndjson_reader)?;
+
+                        Ok(futures::stream::iter(arrow_reader)
                             .map(|r| r.map_err(Into::into))
                             .boxed())
                     }
                 }
                 GetResultPayload::Stream(s) => {
-                    if format_array {
-                        // For streaming, we need to collect all bytes first
-                        let bytes = s
-                            .map_err(DataFusionError::from)
-                            .try_fold(Vec::new(), |mut acc, chunk| async move {
-                                acc.extend_from_slice(&chunk);
-                                Ok(acc)
-                            })
-                            .await?;
-                        let decompressed = file_compression_type
-                            .convert_read(std::io::Cursor::new(bytes))?;
-                        let batches = read_json_array_to_batches(
-                            BufReader::new(decompressed),
-                            schema,
-                            batch_size,
-                        )?;
-                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
-                    } else {
+                    if newline_delimited {
+                        // Newline-delimited JSON (NDJSON) streaming reader
                         let s = s.map_err(DataFusionError::from);
                         let decoder = ReaderBuilder::new(schema)
                             .with_batch_size(batch_size)
@@ -286,6 +340,89 @@ impl FileOpener for JsonOpener {
                             
DecoderDeserializer::new(JsonDecoder::new(decoder)),
                         );
                         Ok(stream.map_err(Into::into).boxed())
+                    } else {
+                        // JSON array format: streaming conversion with 
channel-based byte transfer
+                        //
+                        // Architecture:
+                        // 1. Async task reads from object store stream, 
decompresses, sends to channel
+                        // 2. Blocking task receives bytes, converts JSON 
array to NDJSON, parses to Arrow
+                        // 3. RecordBatches are sent back via another channel
+                        //
+                        // Memory budget (~32MB):
+                        // - sync_channel: CHANNEL_BUFFER_SIZE chunks (~16MB)
+                        // - JsonArrayToNdjsonReader: 2 × 
JSON_CONVERTER_BUFFER_SIZE (~4MB)
+                        // - Arrow JsonReader internal buffer (~8MB)
+                        // - Miscellaneous (~4MB)
+
+                        let s = s.map_err(DataFusionError::from);
+                        let decompressed_stream =
+                            file_compression_type.convert_stream(s.boxed())?;
+
+                        // Channel for bytes: async producer -> sync consumer
+                        let (byte_tx, byte_rx) =
+                            std::sync::mpsc::sync_channel::<bytes::Bytes>(
+                                CHANNEL_BUFFER_SIZE,
+                            );
+
+                        // Channel for results: sync producer -> async consumer
+                        let (result_tx, result_rx) = 
tokio::sync::mpsc::channel(2);
+
+                        // Async task: read from object store stream and send 
bytes to channel
+                        // Store the SpawnedTask to keep it alive until stream 
is dropped
+                        let read_task = SpawnedTask::spawn(async move {
+                            tokio::pin!(decompressed_stream);
+                            while let Some(chunk) = 
decompressed_stream.next().await {
+                                match chunk {
+                                    Ok(bytes) => {
+                                        if byte_tx.send(bytes).is_err() {
+                                            break; // Consumer dropped
+                                        }
+                                    }
+                                    Err(e) => {
+                                        log::error!("Error reading JSON 
stream: {e}");
+                                        break;
+                                    }
+                                }
+                            }
+                            // byte_tx dropped here, signals EOF to 
ChannelReader
+                        });
+
+                        // Blocking task: receive bytes from channel and parse 
JSON
+                        // Store the SpawnedTask to keep it alive until stream 
is dropped
+                        let parse_task = SpawnedTask::spawn_blocking(move || {
+                            let channel_reader = ChannelReader::new(byte_rx);
+                            let ndjson_reader = 
JsonArrayToNdjsonReader::with_capacity(
+                                channel_reader,
+                                JSON_CONVERTER_BUFFER_SIZE,
+                            );
+
+                            match ReaderBuilder::new(schema)
+                                .with_batch_size(batch_size)
+                                .build(ndjson_reader)
+                            {
+                                Ok(arrow_reader) => {
+                                    for batch_result in arrow_reader {
+                                        if 
result_tx.blocking_send(batch_result).is_err()
+                                        {
+                                            break; // Receiver dropped
+                                        }
+                                    }
+                                }
+                                Err(e) => {
+                                    let _ = result_tx.blocking_send(Err(e));
+                                }
+                            }
+                            // result_tx dropped here, closes the stream
+                        });
+
+                        // Wrap in JsonArrayStream to keep tasks alive until 
stream is consumed
+                        let stream = JsonArrayStream {
+                            inner: ReceiverStream::new(result_rx),
+                            _read_task: read_task,
+                            _parse_task: parse_task,
+                        };
+
+                        Ok(stream.map(|r| r.map_err(Into::into)).boxed())
                     }
                 }
             }
@@ -293,40 +430,7 @@ impl FileOpener for JsonOpener {
     }
 }
 
-/// Read JSON array format and convert to RecordBatches
-fn read_json_array_to_batches<R: Read>(
-    mut reader: R,
-    schema: SchemaRef,
-    batch_size: usize,
-) -> Result<Vec<RecordBatch>> {
-    use arrow::json::ReaderBuilder;
-
-    let mut content = String::new();
-    reader.read_to_string(&mut content)?;
-
-    // Parse JSON array
-    let values: Vec<serde_json::Value> = serde_json::from_str(&content)
-        .map_err(|e| DataFusionError::External(Box::new(e)))?;
-
-    if values.is_empty() {
-        return Ok(vec![RecordBatch::new_empty(schema)]);
-    }
-
-    // Convert to NDJSON string for arrow-json reader
-    let ndjson: String = values
-        .iter()
-        .map(|v| v.to_string())
-        .collect::<Vec<_>>()
-        .join("\n");
-
-    let cursor = std::io::Cursor::new(ndjson);
-    let reader = ReaderBuilder::new(schema)
-        .with_batch_size(batch_size)
-        .build(cursor)?;
-
-    reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
-}
-
+/// Write the results of a DataFusion execution plan to JSON files.
 pub async fn plan_to_json(
     task_ctx: Arc<TaskContext>,
     plan: Arc<dyn ExecutionPlan>,
@@ -381,3 +485,307 @@ pub async fn plan_to_json(
 
     Ok(())
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use bytes::Bytes;
+    use datafusion_datasource::FileRange;
+    use futures::TryStreamExt;
+    use object_store::memory::InMemory;
+    use object_store::path::Path;
+    use object_store::PutPayload;
+
+    /// Helper to create a test schema
+    fn test_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int64, true),
+            Field::new("name", DataType::Utf8, true),
+        ]))
+    }
+
+    #[tokio::test]
+    async fn test_json_array_from_file() -> Result<()> {
+        // Test reading JSON array format from a file
+        let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": 
"bob"}]"#;
+
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("test.json");
+        store
+            .put(&path, PutPayload::from_static(json_data.as_bytes()))
+            .await?;
+
+        let opener = JsonOpener::new(
+            1024,
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false, // JSON array format
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let stream = opener.open(file)?.await?;
+        let batches: Vec<_> = stream.try_collect().await?;
+
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 2);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_from_stream() -> Result<()> {
+        // Test reading JSON array format from object store stream (simulates 
S3)
+        let json_data = r#"[{"id": 1, "name": "alice"}, {"id": 2, "name": 
"bob"}, {"id": 3, "name": "charlie"}]"#;
+
+        // Use InMemory store which returns Stream payload
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("test_stream.json");
+        store
+            .put(&path, PutPayload::from_static(json_data.as_bytes()))
+            .await?;
+
+        let opener = JsonOpener::new(
+            2, // small batch size to test multiple batches
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false, // JSON array format
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let stream = opener.open(file)?.await?;
+        let batches: Vec<_> = stream.try_collect().await?;
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 3);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_nested_objects() -> Result<()> {
+        // Test JSON array with nested objects and arrays
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int64, true),
+            Field::new("data", DataType::Utf8, true),
+        ]));
+
+        let json_data = r#"[
+            {"id": 1, "data": "{\"nested\": true}"},
+            {"id": 2, "data": "[1, 2, 3]"}
+        ]"#;
+
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("nested.json");
+        store
+            .put(&path, PutPayload::from_static(json_data.as_bytes()))
+            .await?;
+
+        let opener = JsonOpener::new(
+            1024,
+            schema,
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false,
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let stream = opener.open(file)?.await?;
+        let batches: Vec<_> = stream.try_collect().await?;
+
+        assert_eq!(batches[0].num_rows(), 2);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_empty() -> Result<()> {
+        // Test empty JSON array
+        let json_data = "[]";
+
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("empty.json");
+        store
+            .put(&path, PutPayload::from_static(json_data.as_bytes()))
+            .await?;
+
+        let opener = JsonOpener::new(
+            1024,
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false,
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let stream = opener.open(file)?.await?;
+        let batches: Vec<_> = stream.try_collect().await?;
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 0);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_range_not_supported() {
+        // Test that range-based scanning returns error for JSON array format
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("test.json");
+        store
+            .put(&path, PutPayload::from_static(b"[]"))
+            .await
+            .unwrap();
+
+        let opener = JsonOpener::new(
+            1024,
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false, // JSON array format
+        );
+
+        let meta = store.head(&path).await.unwrap();
+        let mut file = PartitionedFile::new(path.to_string(), meta.size);
+        file.range = Some(FileRange { start: 0, end: 10 });
+
+        let result = opener.open(file);
+        match result {
+            Ok(_) => panic!("Expected error for range-based JSON array 
scanning"),
+            Err(e) => {
+                assert!(
+                    e.to_string().contains("does not support range-based"),
+                    "Unexpected error message: {e}"
+                );
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn test_ndjson_still_works() -> Result<()> {
+        // Ensure NDJSON format still works correctly
+        let json_data =
+            "{\"id\": 1, \"name\": \"alice\"}\n{\"id\": 2, \"name\": 
\"bob\"}\n";
+
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("test.ndjson");
+        store
+            .put(&path, PutPayload::from_static(json_data.as_bytes()))
+            .await?;
+
+        let opener = JsonOpener::new(
+            1024,
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            true, // NDJSON format
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let stream = opener.open(file)?.await?;
+        let batches: Vec<_> = stream.try_collect().await?;
+
+        assert_eq!(batches.len(), 1);
+        assert_eq!(batches[0].num_rows(), 2);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_large_file() -> Result<()> {
+        // Test with a larger JSON array to verify streaming works
+        let mut json_data = String::from("[");
+        for i in 0..1000 {
+            if i > 0 {
+                json_data.push(',');
+            }
+            json_data.push_str(&format!(r#"{{"id": {i}, "name": 
"user{i}"}}"#));
+        }
+        json_data.push(']');
+
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("large.json");
+        store
+            .put(&path, PutPayload::from(Bytes::from(json_data)))
+            .await?;
+
+        let opener = JsonOpener::new(
+            100, // batch size of 100
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false,
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let stream = opener.open(file)?.await?;
+        let batches: Vec<_> = stream.try_collect().await?;
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 1000);
+
+        // Should have multiple batches due to batch_size=100
+        assert!(batches.len() >= 10);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_stream_cancellation() -> Result<()> {
+        // Test that cancellation works correctly (tasks are aborted when 
stream is dropped)
+        let mut json_data = String::from("[");
+        for i in 0..10000 {
+            if i > 0 {
+                json_data.push(',');
+            }
+            json_data.push_str(&format!(r#"{{"id": {i}, "name": 
"user{i}"}}"#));
+        }
+        json_data.push(']');
+
+        let store = Arc::new(InMemory::new());
+        let path = Path::from("cancel_test.json");
+        store
+            .put(&path, PutPayload::from(Bytes::from(json_data)))
+            .await?;
+
+        let opener = JsonOpener::new(
+            10, // small batch size
+            test_schema(),
+            FileCompressionType::UNCOMPRESSED,
+            store.clone(),
+            false,
+        );
+
+        let meta = store.head(&path).await?;
+        let file = PartitionedFile::new(path.to_string(), meta.size);
+
+        let mut stream = opener.open(file)?.await?;
+
+        // Read only first batch, then drop the stream (simulating 
cancellation)
+        let first_batch = stream.next().await;
+        assert!(first_batch.is_some());
+
+        // Drop the stream - this should abort the spawned tasks via 
SpawnedTask's Drop
+        drop(stream);
+
+        // Give tasks time to be aborted
+        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
+
+        // If we reach here without hanging, cancellation worked
+        Ok(())
+    }
+}
diff --git a/datafusion/datasource-json/src/utils.rs 
b/datafusion/datasource-json/src/utils.rs
new file mode 100644
index 0000000000..dfc3cccaa4
--- /dev/null
+++ b/datafusion/datasource-json/src/utils.rs
@@ -0,0 +1,734 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utility types for JSON processing
+
+use std::io::{BufRead, Read};
+use std::sync::mpsc::Receiver;
+
+use bytes::Bytes;
+
+// ============================================================================
+// JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter
+// ============================================================================
+//
+// Architecture:
+//
+// ```text
+// ┌─────────────────────────────────────────────────────────────┐
+// │  JSON Array File (potentially very large, e.g. 33GB)       │
+// │  [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}]         │
+// └─────────────────────────────────────────────────────────────┘
+//                           │
+//                           ▼ read chunks via ChannelReader
+//                 ┌───────────────────┐
+//                 │ JsonArrayToNdjson │  ← character substitution only:
+//                 │      Reader       │    '[' skip, ',' → '\n', ']' stop
+//                 └───────────────────┘
+//                           │
+//                           ▼ outputs NDJSON format
+//                 ┌───────────────────┐
+//                 │   Arrow Reader    │  ← internal buffer, batch parsing
+//                 │  batch_size=8192  │
+//                 └───────────────────┘
+//                           │
+//                           ▼ outputs RecordBatch
+//                 ┌───────────────────┐
+//                 │   RecordBatch     │
+//                 └───────────────────┘
+// ```
+//
+// Memory Efficiency:
+//
+// | Approach                              | Memory for 33GB file | Parse 
count |
+// 
|---------------------------------------|----------------------|-------------|
+// | Load entire file + serde_json         | ~100GB+              | 3x         
 |
+// | Streaming with JsonArrayToNdjsonReader| ~32MB (configurable) | 1x         
 |
+//
+// Design Note:
+//
+// This implementation uses `inner: R` directly (not `BufReader<R>`) and 
manages
+// its own input buffer. This is critical for compatibility with `SyncIoBridge`
+// and `ChannelReader` in `spawn_blocking` contexts.
+//
+
+/// Default buffer size for JsonArrayToNdjsonReader (2MB for better throughput)
+const DEFAULT_BUF_SIZE: usize = 2 * 1024 * 1024;
+
+/// Parser state for JSON array streaming
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+enum JsonArrayState {
+    /// Initial state, looking for opening '['
+    Start,
+    /// Inside the JSON array, processing objects
+    InArray,
+    /// Reached the closing ']', finished
+    Done,
+}
+
+/// A streaming reader that converts JSON array format to NDJSON format.
+///
+/// This reader wraps an underlying reader containing JSON array data
+/// `[{...}, {...}, ...]` and transforms it on-the-fly to newline-delimited
+/// JSON format that Arrow's JSON reader can process.
+///
+/// Implements both `Read` and `BufRead` traits for compatibility with Arrow's
+/// `ReaderBuilder::build()` which requires `BufRead`.
+///
+/// # Transformation Rules
+///
+/// - Skip leading `[` and whitespace before it
+/// - Convert top-level `,` (between objects) to `\n`
+/// - Skip whitespace at top level (between objects)
+/// - Stop at trailing `]`
+/// - Preserve everything inside objects (including nested `[`, `]`, `,`)
+/// - Properly handle strings (ignore special chars inside quotes)
+///
+/// # Example
+///
+/// ```text
+/// Input:  [{"a":1}, {"b":[1,2]}, {"c":"x,y"}]
+/// Output: {"a":1}
+///         {"b":[1,2]}
+///         {"c":"x,y"}
+/// ```
+pub struct JsonArrayToNdjsonReader<R: Read> {
+    /// Inner reader - we use R directly (not `BufReader<R>`) for SyncIoBridge 
compatibility
+    inner: R,
+    state: JsonArrayState,
+    /// Tracks nesting depth of `{` and `[` to identify top-level commas
+    depth: i32,
+    /// Whether we're currently inside a JSON string
+    in_string: bool,
+    /// Whether the next character is escaped (after `\`)
+    escape_next: bool,
+    /// Input buffer - stores raw bytes read from inner reader
+    input_buffer: Vec<u8>,
+    /// Current read position in input buffer
+    input_pos: usize,
+    /// Number of valid bytes in input buffer
+    input_filled: usize,
+    /// Output buffer - stores transformed NDJSON bytes
+    output_buffer: Vec<u8>,
+    /// Current read position in output buffer
+    output_pos: usize,
+    /// Number of valid bytes in output buffer
+    output_filled: usize,
+    /// Whether trailing non-whitespace content was detected after ']'
+    has_trailing_content: bool,
+}
+
+impl<R: Read> JsonArrayToNdjsonReader<R> {
+    /// Create a new streaming reader that converts JSON array to NDJSON.
+    pub fn new(reader: R) -> Self {
+        Self::with_capacity(reader, DEFAULT_BUF_SIZE)
+    }
+
+    /// Create a new streaming reader with custom buffer size.
+    ///
+    /// Larger buffers improve throughput but use more memory.
+    /// Total memory usage is approximately 2 * capacity (input + output 
buffers).
+    pub fn with_capacity(reader: R, capacity: usize) -> Self {
+        Self {
+            inner: reader,
+            state: JsonArrayState::Start,
+            depth: 0,
+            in_string: false,
+            escape_next: false,
+            input_buffer: vec![0; capacity],
+            input_pos: 0,
+            input_filled: 0,
+            output_buffer: vec![0; capacity],
+            output_pos: 0,
+            output_filled: 0,
+            has_trailing_content: false,
+        }
+    }
+
+    /// Check if the JSON array was properly terminated.
+    ///
+    /// This should be called after all data has been read.
+    ///
+    /// Returns an error if:
+    /// - Unbalanced braces/brackets (depth != 0)
+    /// - Unterminated string
+    /// - Missing closing `]`
+    /// - Unexpected trailing content after `]`
+    pub fn validate_complete(&self) -> std::io::Result<()> {
+        if self.depth != 0 {
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "Malformed JSON array: unbalanced braces or brackets",
+            ));
+        }
+        if self.in_string {
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "Malformed JSON array: unterminated string",
+            ));
+        }
+        if self.state != JsonArrayState::Done {
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "Incomplete JSON array: expected closing bracket ']'",
+            ));
+        }
+        if self.has_trailing_content {
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "Malformed JSON: unexpected trailing content after ']'",
+            ));
+        }
+        Ok(())
+    }
+
+    /// Process a single byte and return the transformed byte (if any)
+    #[inline]
+    fn process_byte(&mut self, byte: u8) -> Option<u8> {
+        match self.state {
+            JsonArrayState::Start => {
+                // Looking for the opening '[', skip whitespace
+                if byte == b'[' {
+                    self.state = JsonArrayState::InArray;
+                }
+                // Skip whitespace and the '[' itself
+                None
+            }
+            JsonArrayState::InArray => {
+                // Handle escape sequences in strings
+                if self.escape_next {
+                    self.escape_next = false;
+                    return Some(byte);
+                }
+
+                if self.in_string {
+                    // Inside a string: handle escape and closing quote
+                    match byte {
+                        b'\\' => self.escape_next = true,
+                        b'"' => self.in_string = false,
+                        _ => {}
+                    }
+                    Some(byte)
+                } else {
+                    // Outside strings: track depth and transform
+                    match byte {
+                        b'"' => {
+                            self.in_string = true;
+                            Some(byte)
+                        }
+                        b'{' | b'[' => {
+                            self.depth += 1;
+                            Some(byte)
+                        }
+                        b'}' => {
+                            self.depth -= 1;
+                            Some(byte)
+                        }
+                        b']' => {
+                            if self.depth == 0 {
+                                // Top-level ']' means end of array
+                                self.state = JsonArrayState::Done;
+                                None
+                            } else {
+                                // Nested ']' inside an object
+                                self.depth -= 1;
+                                Some(byte)
+                            }
+                        }
+                        b',' if self.depth == 0 => {
+                            // Top-level comma between objects → newline
+                            Some(b'\n')
+                        }
+                        _ => {
+                            // At depth 0, skip whitespace between objects
+                            if self.depth == 0 && byte.is_ascii_whitespace() {
+                                None
+                            } else {
+                                Some(byte)
+                            }
+                        }
+                    }
+                }
+            }
+            JsonArrayState::Done => {
+                // After ']', check for non-whitespace trailing content
+                if !byte.is_ascii_whitespace() {
+                    self.has_trailing_content = true;
+                }
+                None
+            }
+        }
+    }
+
+    /// Refill input buffer from inner reader if needed.
+    /// Returns true if there's data available, false on EOF.
+    fn refill_input_if_needed(&mut self) -> std::io::Result<bool> {
+        if self.input_pos >= self.input_filled {
+            // Input buffer exhausted, read more from inner
+            let bytes_read = self.inner.read(&mut self.input_buffer)?;
+            if bytes_read == 0 {
+                return Ok(false); // EOF
+            }
+            self.input_pos = 0;
+            self.input_filled = bytes_read;
+        }
+        Ok(true)
+    }
+
+    /// Fill the output buffer with transformed data.
+    ///
+    /// This method manages its own input buffer, reading from the inner reader
+    /// as needed. When the output buffer is full, we stop processing but 
preserve
+    /// the current position in the input buffer for the next call.
+    fn fill_output_buffer(&mut self) -> std::io::Result<()> {
+        let mut write_pos = 0;
+
+        while write_pos < self.output_buffer.len() {
+            // Refill input buffer if exhausted
+            if !self.refill_input_if_needed()? {
+                break; // EOF
+            }
+
+            // Process bytes from input buffer
+            while self.input_pos < self.input_filled
+                && write_pos < self.output_buffer.len()
+            {
+                let byte = self.input_buffer[self.input_pos];
+                self.input_pos += 1;
+
+                if let Some(transformed) = self.process_byte(byte) {
+                    self.output_buffer[write_pos] = transformed;
+                    write_pos += 1;
+                }
+            }
+        }
+
+        self.output_pos = 0;
+        self.output_filled = write_pos;
+        Ok(())
+    }
+}
+
+impl<R: Read> Read for JsonArrayToNdjsonReader<R> {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        // If output buffer is empty, fill it
+        if self.output_pos >= self.output_filled {
+            self.fill_output_buffer()?;
+            if self.output_filled == 0 {
+                return Ok(0); // EOF
+            }
+        }
+
+        // Copy from output buffer to caller's buffer
+        let available = self.output_filled - self.output_pos;
+        let to_copy = std::cmp::min(available, buf.len());
+        buf[..to_copy].copy_from_slice(
+            &self.output_buffer[self.output_pos..self.output_pos + to_copy],
+        );
+        self.output_pos += to_copy;
+        Ok(to_copy)
+    }
+}
+
+impl<R: Read> BufRead for JsonArrayToNdjsonReader<R> {
+    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
+        if self.output_pos >= self.output_filled {
+            self.fill_output_buffer()?;
+        }
+        Ok(&self.output_buffer[self.output_pos..self.output_filled])
+    }
+
+    fn consume(&mut self, amt: usize) {
+        self.output_pos = std::cmp::min(self.output_pos + amt, 
self.output_filled);
+    }
+}
+
+// ============================================================================
+// ChannelReader - Sync reader that receives bytes from async channel
+// ============================================================================
+//
+// Architecture:
+//
+// ```text
+// ┌─────────────────────────────────────────────────────────────────────────┐
+// │                         S3 / MinIO (async)                              │
+// │                    (33GB JSON Array File)                               │
+// └─────────────────────────────────────────────────────────────────────────┘
+//                                 │
+//                                 ▼ async stream (Bytes chunks)
+// ┌─────────────────────────────────────────────────────────────────────────┐
+// │                      Async Task (tokio runtime)                         │
+// │              while let Some(chunk) = stream.next().await                │
+// │                     byte_tx.send(chunk)                                 │
+// └─────────────────────────────────────────────────────────────────────────┘
+//                                 │
+//                                 ▼ std::sync::mpsc::sync_channel<Bytes>
+//                                 │   (bounded, ~32MB buffer)
+//                                 ▼
+// ┌─────────────────────────────────────────────────────────────────────────┐
+// │                   Blocking Task (spawn_blocking)                        │
+// │  ┌──────────────┐   ┌────────────────────────┐   ┌──────────────────┐  │
+// │  │ChannelReader │ → │JsonArrayToNdjsonReader │ → │ Arrow JsonReader │  │
+// │  │   (Read)     │   │  [{},...] → {}\n{}     │   │  (RecordBatch)   │  │
+// │  └──────────────┘   └────────────────────────┘   └──────────────────┘  │
+// └─────────────────────────────────────────────────────────────────────────┘
+//                                 │
+//                                 ▼ tokio::sync::mpsc::channel<RecordBatch>
+// ┌─────────────────────────────────────────────────────────────────────────┐
+// │                      ReceiverStream (async)                             │
+// │                   → DataFusion execution engine                         │
+// └─────────────────────────────────────────────────────────────────────────┘
+// ```
+//
+// Memory Budget (~32MB total):
+// - sync_channel buffer: 128 chunks × ~128KB = ~16MB
+// - JsonArrayToNdjsonReader: 2 × 2MB = 4MB
+// - Arrow JsonReader internal: ~8MB
+// - Miscellaneous: ~4MB
+//
+
+/// A synchronous `Read` implementation that receives bytes from a channel.
+///
+/// This enables true streaming between async and sync contexts without
+/// loading the entire file into memory.
+pub struct ChannelReader {
+    rx: Receiver<Bytes>,
+    current: Option<Bytes>,
+    pos: usize,
+}
+
+impl ChannelReader {
+    /// Create a new ChannelReader from a receiver.
+    pub fn new(rx: Receiver<Bytes>) -> Self {
+        Self {
+            rx,
+            current: None,
+            pos: 0,
+        }
+    }
+}
+
+impl Read for ChannelReader {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        loop {
+            // If we have current chunk with remaining data, read from it
+            if let Some(ref chunk) = self.current {
+                let remaining = chunk.len() - self.pos;
+                if remaining > 0 {
+                    let to_copy = std::cmp::min(remaining, buf.len());
+                    buf[..to_copy].copy_from_slice(&chunk[self.pos..self.pos + 
to_copy]);
+                    self.pos += to_copy;
+                    return Ok(to_copy);
+                }
+            }
+
+            // Current chunk exhausted, get next from channel
+            match self.rx.recv() {
+                Ok(bytes) => {
+                    self.current = Some(bytes);
+                    self.pos = 0;
+                    // Loop back to read from new chunk
+                }
+                Err(_) => return Ok(0), // Channel closed = EOF
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_json_array_to_ndjson_simple() {
+        let input = r#"[{"a":1}, {"a":2}, {"a":3}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        assert_eq!(output, "{\"a\":1}\n{\"a\":2}\n{\"a\":3}");
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_nested() {
+        let input = r#"[{"a":{"b":1}}, {"c":[1,2,3]}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        assert_eq!(output, "{\"a\":{\"b\":1}}\n{\"c\":[1,2,3]}");
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_strings_with_special_chars() {
+        let input = r#"[{"a":"[1,2]"}, {"b":"x,y"}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        assert_eq!(output, "{\"a\":\"[1,2]\"}\n{\"b\":\"x,y\"}");
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_escaped_quotes() {
+        let input = r#"[{"a":"say \"hello\""}, {"b":1}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        assert_eq!(output, "{\"a\":\"say \\\"hello\\\"\"}\n{\"b\":1}");
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_empty() {
+        let input = r#"[]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        assert_eq!(output, "");
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_single_element() {
+        let input = r#"[{"a":1}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        assert_eq!(output, "{\"a\":1}");
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_bufread() {
+        let input = r#"[{"a":1}, {"a":2}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+
+        let buf = reader.fill_buf().unwrap();
+        assert!(!buf.is_empty());
+
+        let first_len = buf.len();
+        reader.consume(first_len);
+
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+    }
+
+    #[test]
+    fn test_json_array_to_ndjson_whitespace() {
+        let input = r#"  [  {"a":1}  ,  {"a":2}  ]  "#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        // Top-level whitespace is skipped, internal whitespace preserved
+        assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
+    }
+
+    #[test]
+    fn test_validate_complete_valid_json() {
+        let valid_json = r#"[{"a":1},{"a":2}]"#;
+        let mut reader = JsonArrayToNdjsonReader::new(valid_json.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+        reader.validate_complete().unwrap();
+    }
+
+    #[test]
+    fn test_json_array_with_trailing_junk() {
+        let input = r#" [ {"a":1} , {"a":2} ] some { junk [ here ] "#;
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        // Should extract the valid array content
+        assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
+
+        // But validation should catch the trailing junk
+        let result = reader.validate_complete();
+        assert!(result.is_err());
+        let err_msg = result.unwrap_err().to_string();
+        assert!(
+            err_msg.contains("trailing content")
+                || err_msg.contains("Unexpected trailing"),
+            "Expected trailing content error, got: {err_msg}"
+        );
+    }
+
+    #[test]
+    fn test_validate_complete_incomplete_array() {
+        let invalid_json = r#"[{"a":1},{"a":2}"#; // Missing closing ]
+        let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        let result = reader.validate_complete();
+        assert!(result.is_err());
+        let err_msg = result.unwrap_err().to_string();
+        assert!(
+            err_msg.contains("expected closing bracket")
+                || err_msg.contains("missing closing"),
+            "Expected missing bracket error, got: {err_msg}"
+        );
+    }
+
+    #[test]
+    fn test_validate_complete_unbalanced_braces() {
+        let invalid_json = r#"[{"a":1},{"a":2]"#; // Wrong closing bracket
+        let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        let result = reader.validate_complete();
+        assert!(result.is_err());
+        let err_msg = result.unwrap_err().to_string();
+        assert!(
+            err_msg.contains("unbalanced")
+                || err_msg.contains("expected closing bracket"),
+            "Expected unbalanced or missing bracket error, got: {err_msg}"
+        );
+    }
+
+    #[test]
+    fn test_validate_complete_valid_with_trailing_whitespace() {
+        let input = r#"[{"a":1},{"a":2}]
+    "#; // Trailing whitespace is OK
+        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        // Whitespace after ] should be allowed
+        reader.validate_complete().unwrap();
+    }
+
+    /// Test that data is not lost at buffer boundaries.
+    ///
+    /// This test creates input larger than the internal buffer to verify
+    /// that newline characters are not dropped when they occur at buffer 
boundaries.
+    #[test]
+    fn test_buffer_boundary_no_data_loss() {
+        // Create objects ~9KB each, so 10 objects = ~90KB
+        let large_value = "x".repeat(9000);
+
+        let mut objects = vec![];
+        for i in 0..10 {
+            objects.push(format!(r#"{{"id":{i},"data":"{large_value}"}}"#));
+        }
+
+        let input = format!("[{}]", objects.join(","));
+
+        // Use small buffer to force multiple fill cycles
+        let mut reader = 
JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        // Verify correct number of newlines (9 newlines separate 10 objects)
+        let newline_count = output.matches('\n').count();
+        assert_eq!(
+            newline_count, 9,
+            "Expected 9 newlines separating 10 objects, got {newline_count}"
+        );
+
+        // Verify each line is valid JSON
+        for (i, line) in output.lines().enumerate() {
+            let parsed: Result<serde_json::Value, _> = 
serde_json::from_str(line);
+            assert!(
+                parsed.is_ok(),
+                "Line {} is not valid JSON: {}...",
+                i,
+                &line[..100.min(line.len())]
+            );
+
+            // Verify the id field matches expected value
+            let value = parsed.unwrap();
+            assert_eq!(
+                value["id"].as_i64(),
+                Some(i as i64),
+                "Object {i} has wrong id"
+            );
+        }
+    }
+
+    /// Test with real-world-like data format (with leading whitespace and 
newlines)
+    #[test]
+    fn test_real_world_format_large() {
+        let large_value = "x".repeat(8000);
+
+        // Format similar to real files: opening bracket on its own line,
+        // each object indented with 2 spaces
+        let mut objects = vec![];
+        for i in 0..10 {
+            objects.push(format!(r#"  {{"id":{i},"data":"{large_value}"}}"#));
+        }
+
+        let input = format!("[\n{}\n]", objects.join(",\n"));
+
+        let mut reader = 
JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        let lines: Vec<&str> = output.lines().collect();
+        assert_eq!(lines.len(), 10, "Expected 10 objects");
+
+        for (i, line) in lines.iter().enumerate() {
+            assert!(
+                line.starts_with("{\"id\""),
+                "Line {} should start with object, got: {}...",
+                i,
+                &line[..50.min(line.len())]
+            );
+        }
+    }
+
+    /// Test ChannelReader
+    #[test]
+    fn test_channel_reader() {
+        let (tx, rx) = std::sync::mpsc::sync_channel(4);
+
+        // Send some chunks
+        tx.send(Bytes::from("Hello, ")).unwrap();
+        tx.send(Bytes::from("World!")).unwrap();
+        drop(tx); // Close channel
+
+        let mut reader = ChannelReader::new(rx);
+        let mut output = String::new();
+        reader.read_to_string(&mut output).unwrap();
+
+        assert_eq!(output, "Hello, World!");
+    }
+
+    /// Test ChannelReader with small reads
+    #[test]
+    fn test_channel_reader_small_reads() {
+        let (tx, rx) = std::sync::mpsc::sync_channel(4);
+
+        tx.send(Bytes::from("ABCDEFGHIJ")).unwrap();
+        drop(tx);
+
+        let mut reader = ChannelReader::new(rx);
+        let mut buf = [0u8; 3];
+
+        // Read in small chunks
+        assert_eq!(reader.read(&mut buf).unwrap(), 3);
+        assert_eq!(&buf, b"ABC");
+
+        assert_eq!(reader.read(&mut buf).unwrap(), 3);
+        assert_eq!(&buf, b"DEF");
+
+        assert_eq!(reader.read(&mut buf).unwrap(), 3);
+        assert_eq!(&buf, b"GHI");
+
+        assert_eq!(reader.read(&mut buf).unwrap(), 1);
+        assert_eq!(&buf[..1], b"J");
+
+        // EOF
+        assert_eq!(reader.read(&mut buf).unwrap(), 0);
+    }
+}
diff --git a/datafusion/proto-common/proto/datafusion_common.proto 
b/datafusion/proto-common/proto/datafusion_common.proto
index c598b8dcb6..5df9ff9faf 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -467,8 +467,7 @@ message CsvOptions {
 message JsonOptions {
   CompressionTypeVariant compression = 1; // Compression type
   optional uint64 schema_infer_max_rec = 2; // Optional max records for schema 
inference
-  optional uint32 compression_level = 3; // Optional compression level
-  bool format_array = 4; // Whether the JSON is in array format [{},...] 
(default false = line-delimited)
+  optional bool newline_delimited = 3; // Whether to read as newline-delimited 
JSON (default true). When false, expects JSON array format [{},...]
 }
 
 message TableParquetOptions {
diff --git a/datafusion/proto-common/src/from_proto/mod.rs 
b/datafusion/proto-common/src/from_proto/mod.rs
index 71f8cc3e78..ba8ae52202 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -1092,8 +1092,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
         Ok(JsonOptions {
             compression: compression.into(),
             schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as 
usize),
-            compression_level: proto_opts.compression_level.map(|h| h as 
usize),
-            format_array: proto_opts.format_array,
+            newline_delimited: proto_opts.newline_delimited.unwrap_or(true),
         })
     }
 }
diff --git a/datafusion/proto-common/src/generated/pbjson.rs 
b/datafusion/proto-common/src/generated/pbjson.rs
index cc646e48b0..b9dd232c18 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -4548,10 +4548,7 @@ impl serde::Serialize for JsonOptions {
         if self.schema_infer_max_rec.is_some() {
             len += 1;
         }
-        if self.compression_level.is_some() {
-            len += 1;
-        }
-        if self.format_array {
+        if self.newline_delimited.is_some() {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion_common.JsonOptions", len)?;
@@ -4565,11 +4562,8 @@ impl serde::Serialize for JsonOptions {
             #[allow(clippy::needless_borrows_for_generic_args)]
             struct_ser.serialize_field("schemaInferMaxRec", 
ToString::to_string(&v).as_str())?;
         }
-        if let Some(v) = self.compression_level.as_ref() {
-            struct_ser.serialize_field("compressionLevel", v)?;
-        }
-        if self.format_array {
-            struct_ser.serialize_field("formatArray", &self.format_array)?;
+        if let Some(v) = self.newline_delimited.as_ref() {
+            struct_ser.serialize_field("newlineDelimited", v)?;
         }
         struct_ser.end()
     }
@@ -4584,18 +4578,15 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
             "compression",
             "schema_infer_max_rec",
             "schemaInferMaxRec",
-            "compression_level",
-            "compressionLevel",
-            "format_array",
-            "formatArray",
+            "newline_delimited",
+            "newlineDelimited",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Compression,
             SchemaInferMaxRec,
-            CompressionLevel,
-            FormatArray,
+            NewlineDelimited,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -4619,8 +4610,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
                         match value {
                             "compression" => Ok(GeneratedField::Compression),
                             "schemaInferMaxRec" | "schema_infer_max_rec" => 
Ok(GeneratedField::SchemaInferMaxRec),
-                            "compressionLevel" | "compression_level" => 
Ok(GeneratedField::CompressionLevel),
-                            "formatArray" | "format_array" => 
Ok(GeneratedField::FormatArray),
+                            "newlineDelimited" | "newline_delimited" => 
Ok(GeneratedField::NewlineDelimited),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -4642,8 +4632,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
             {
                 let mut compression__ = None;
                 let mut schema_infer_max_rec__ = None;
-                let mut compression_level__ = None;
-                let mut format_array__ = None;
+                let mut newline_delimited__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Compression => {
@@ -4660,27 +4649,18 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
                                 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
                             ;
                         }
-                        GeneratedField::CompressionLevel => {
-                            if compression_level__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("compressionLevel"));
-                            }
-                            compression_level__ = 
-                                
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
-                            ;
-                        }
-                        GeneratedField::FormatArray => {
-                            if format_array__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("formatArray"));
+                        GeneratedField::NewlineDelimited => {
+                            if newline_delimited__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("newlineDelimited"));
                             }
-                            format_array__ = Some(map_.next_value()?);
+                            newline_delimited__ = map_.next_value()?;
                         }
                     }
                 }
                 Ok(JsonOptions {
                     compression: compression__.unwrap_or_default(),
                     schema_infer_max_rec: schema_infer_max_rec__,
-                    compression_level: compression_level__,
-                    format_array: format_array__.unwrap_or_default(),
+                    newline_delimited: newline_delimited__,
                 })
             }
         }
diff --git a/datafusion/proto-common/src/generated/prost.rs 
b/datafusion/proto-common/src/generated/prost.rs
index 24d6600e4d..6e8c0368fb 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -659,12 +659,9 @@ pub struct JsonOptions {
     /// Optional max records for schema inference
     #[prost(uint64, optional, tag = "2")]
     pub schema_infer_max_rec: ::core::option::Option<u64>,
-    /// Optional compression level
-    #[prost(uint32, optional, tag = "3")]
-    pub compression_level: ::core::option::Option<u32>,
-    /// Whether the JSON is in array format \[{},...\] (default false = 
line-delimited)
-    #[prost(bool, tag = "4")]
-    pub format_array: bool,
+    /// Whether to read as newline-delimited JSON (default true). When false, 
expects JSON array format \[{},...\]
+    #[prost(bool, optional, tag = "3")]
+    pub newline_delimited: ::core::option::Option<bool>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TableParquetOptions {
diff --git a/datafusion/proto-common/src/to_proto/mod.rs 
b/datafusion/proto-common/src/to_proto/mod.rs
index 69f3c269c7..33e38826de 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -986,8 +986,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
         Ok(protobuf::JsonOptions {
             compression: compression.into(),
             schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
-            compression_level: opts.compression_level.map(|h| h as u32),
-            format_array: opts.format_array,
+            newline_delimited: Some(opts.newline_delimited),
         })
     }
 }
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs 
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index 24d6600e4d..6e8c0368fb 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -659,12 +659,9 @@ pub struct JsonOptions {
     /// Optional max records for schema inference
     #[prost(uint64, optional, tag = "2")]
     pub schema_infer_max_rec: ::core::option::Option<u64>,
-    /// Optional compression level
-    #[prost(uint32, optional, tag = "3")]
-    pub compression_level: ::core::option::Option<u32>,
-    /// Whether the JSON is in array format \[{},...\] (default false = 
line-delimited)
-    #[prost(bool, tag = "4")]
-    pub format_array: bool,
+    /// Whether to read as newline-delimited JSON (default true). When false, 
expects JSON array format \[{},...\]
+    #[prost(bool, optional, tag = "3")]
+    pub newline_delimited: ::core::option::Option<bool>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TableParquetOptions {
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs 
b/datafusion/proto/src/logical_plan/file_formats.rs
index 3685e61542..0891e6aeb5 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -238,8 +238,7 @@ impl JsonOptionsProto {
             JsonOptionsProto {
                 compression: options.compression as i32,
                 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v 
as u64),
-                compression_level: options.compression_level.map(|v| v as u32),
-                format_array: options.format_array,
+                newline_delimited: Some(options.newline_delimited),
             }
         } else {
             JsonOptionsProto::default()
@@ -258,8 +257,7 @@ impl From<&JsonOptionsProto> for JsonOptions {
                 _ => CompressionTypeVariant::UNCOMPRESSED,
             },
             schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as 
usize),
-            compression_level: proto.compression_level.map(|v| v as usize),
-            format_array: proto.format_array,
+            newline_delimited: proto.newline_delimited.unwrap_or(true),
         }
     }
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 9c63f8078e..2ff3393178 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -28,7 +28,7 @@ use datafusion::datasource::file_format::json::{JsonFormat, 
JsonFormatFactory};
 use datafusion::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
 };
-use datafusion::execution::options::ArrowReadOptions;
+use datafusion::execution::options::{ArrowReadOptions, JsonReadOptions};
 use datafusion::optimizer::eliminate_nested_union::EliminateNestedUnion;
 use datafusion::optimizer::Optimizer;
 use datafusion_common::parsers::CompressionTypeVariant;
@@ -749,7 +749,7 @@ async fn create_json_scan(ctx: &SessionContext) -> 
Result<LogicalPlan, DataFusio
     ctx.register_json(
         "t1",
         "../core/tests/data/1.json",
-        NdJsonReadOptions::default(),
+        JsonReadOptions::default(),
     )
     .await?;
 
diff --git a/datafusion/sqllogictest/test_files/json.slt 
b/datafusion/sqllogictest/test_files/json.slt
index 4442a6a2d5..60bec4213d 100644
--- a/datafusion/sqllogictest/test_files/json.slt
+++ b/datafusion/sqllogictest/test_files/json.slt
@@ -151,11 +151,11 @@ physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/
 ## JSON Array Format Tests
 ##########
 
-# Test reading JSON array format file with format_array=true
+# Test reading JSON array format file with newline_delimited=false
 statement ok
 CREATE EXTERNAL TABLE json_array_test
 STORED AS JSON
-OPTIONS ('format.format_array' 'true')
+OPTIONS ('format.newline_delimited' 'false')
 LOCATION '../core/tests/data/json_array.json';
 
 query IT rowsort
@@ -168,9 +168,9 @@ SELECT a, b FROM json_array_test
 statement ok
 DROP TABLE json_array_test;
 
-# Test that reading JSON array format WITHOUT format_array option fails
-# (default is line-delimited mode which can't parse array format correctly)
+# Test that reading JSON array format WITHOUT newline_delimited option fails
+# (default is newline_delimited=true which can't parse array format correctly)
 statement error Not valid JSON
 CREATE EXTERNAL TABLE json_array_as_ndjson
 STORED AS JSON
-LOCATION '../core/tests/data/json_array.json';
\ No newline at end of file
+LOCATION '../core/tests/data/json_array.json';


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to