This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch fix/sort-merge-reservation-starvation
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit 85836852e5aaa9ad3b4ef4dbf0cec142db947211
Author: Qi Zhu <[email protected]>
AuthorDate: Thu Jan 22 18:05:06 2026 +0800

    Array json support for datafusion (#27)
---
 Cargo.lock                                         |   1 +
 datafusion-examples/examples/csv_json_opener.rs    |   1 +
 datafusion/common/src/config.rs                    |  17 ++
 datafusion/core/src/datasource/file_format/json.rs | 313 +++++++++++++++++++++
 .../core/src/datasource/file_format/options.rs     |  20 +-
 datafusion/core/tests/data/json_array.json         |   5 +
 datafusion/core/tests/data/json_empty_array.json   |   1 +
 datafusion/datasource-json/Cargo.toml              |   1 +
 datafusion/datasource-json/src/file_format.rs      |  97 ++++++-
 datafusion/datasource-json/src/source.rs           | 121 ++++++--
 .../proto-common/proto/datafusion_common.proto     |   2 +
 datafusion/proto-common/src/from_proto/mod.rs      |   2 +
 datafusion/proto-common/src/generated/pbjson.rs    |  38 +++
 datafusion/proto-common/src/generated/prost.rs     |   6 +
 datafusion/proto-common/src/to_proto/mod.rs        |   2 +
 .../proto/src/generated/datafusion_proto_common.rs |   6 +
 datafusion/proto/src/logical_plan/file_formats.rs  |   4 +
 datafusion/sqllogictest/test_files/json.slt        |  28 ++
 18 files changed, 635 insertions(+), 30 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index d712eecfcc..2cfdd82851 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2156,6 +2156,7 @@ dependencies = [
  "datafusion-session",
  "futures",
  "object_store",
+ "serde_json",
  "tokio",
 ]
 
diff --git a/datafusion-examples/examples/csv_json_opener.rs 
b/datafusion-examples/examples/csv_json_opener.rs
index ef2a3eaca0..e7b809c82e 100644
--- a/datafusion-examples/examples/csv_json_opener.rs
+++ b/datafusion-examples/examples/csv_json_opener.rs
@@ -121,6 +121,7 @@ async fn json_opener() -> Result<()> {
         projected,
         FileCompressionType::UNCOMPRESSED,
         Arc::new(object_store),
+        false,
     );
 
     let scan_config = FileScanConfigBuilder::new(
diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index a77fd764ee..68022c2f06 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -2813,6 +2813,23 @@ config_namespace! {
     pub struct JsonOptions {
         pub compression: CompressionTypeVariant, default = 
CompressionTypeVariant::UNCOMPRESSED
         pub schema_infer_max_rec: Option<usize>, default = None
+        pub compression_level: Option<usize>, default = None
+        /// The format of JSON input files.
+        ///
+        /// When `false` (default), expects newline-delimited JSON (NDJSON):
+        /// ```text
+        /// {"key1": 1, "key2": "val"}
+        /// {"key1": 2, "key2": "vals"}
+        /// ```
+        ///
+        /// When `true`, expects JSON array format:
+        /// ```text
+        /// [
+        ///     {"key1": 1, "key2": "val"},
+        ///     {"key1": 2, "key2": "vals"}
+        /// ]
+        /// ```
+        pub format_array: bool, default = false
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 34d3d64f07..cce8ead46c 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -47,6 +47,7 @@ mod tests {
     use datafusion_common::stats::Precision;
 
     use datafusion_common::Result;
+    use datafusion_datasource::file_compression_type::FileCompressionType;
     use futures::StreamExt;
     use insta::assert_snapshot;
     use object_store::local::LocalFileSystem;
@@ -349,4 +350,316 @@ mod tests {
     fn fmt_batches(batches: &[RecordBatch]) -> String {
         pretty::pretty_format_batches(batches).unwrap().to_string()
     }
+
+    #[tokio::test]
+    async fn test_write_empty_json_from_sql() -> Result<()> {
+        let ctx = SessionContext::new();
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/empty_sql.json", 
tmp_dir.path().to_string_lossy());
+        let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
+        df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), 
None)
+            .await?;
+        // Expected the file to exist and be empty
+        assert!(std::path::Path::new(&path).exists());
+        let metadata = std::fs::metadata(&path)?;
+        assert_eq!(metadata.len(), 0);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_write_empty_json_from_record_batch() -> Result<()> {
+        let ctx = SessionContext::new();
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int64, false),
+            Field::new("name", DataType::Utf8, true),
+        ]));
+        let empty_batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
+                
Arc::new(arrow::array::StringArray::from(Vec::<Option<&str>>::new())),
+            ],
+        )?;
+
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/empty_batch.json", 
tmp_dir.path().to_string_lossy());
+        let df = ctx.read_batch(empty_batch.clone())?;
+        df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), 
None)
+            .await?;
+        // Expected the file to exist and be empty
+        assert!(std::path::Path::new(&path).exists());
+        let metadata = std::fs::metadata(&path)?;
+        assert_eq!(metadata.len(), 0);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_format() -> Result<()> {
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let store = Arc::new(LocalFileSystem::new()) as _;
+
+        // Create a temporary file with JSON array format
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
+        std::fs::write(
+            &path,
+            r#"[
+                {"a": 1, "b": 2.0, "c": true},
+                {"a": 2, "b": 3.5, "c": false},
+                {"a": 3, "b": 4.0, "c": true}
+            ]"#,
+        )?;
+
+        // Test with format_array = true
+        let format = JsonFormat::default().with_format_array(true);
+        let file_schema = format
+            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+            .await
+            .expect("Schema inference");
+
+        let fields = file_schema
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect::<Vec<_>>();
+        assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_format_empty() -> Result<()> {
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let store = Arc::new(LocalFileSystem::new()) as _;
+
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/empty_array.json", 
tmp_dir.path().to_string_lossy());
+        std::fs::write(&path, "[]")?;
+
+        let format = JsonFormat::default().with_format_array(true);
+        let result = format
+            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+            .await;
+
+        assert!(result.is_err());
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("JSON array is empty"));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_format_with_limit() -> Result<()> {
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let store = Arc::new(LocalFileSystem::new()) as _;
+
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/array_limit.json", 
tmp_dir.path().to_string_lossy());
+        std::fs::write(
+            &path,
+            r#"[
+                {"a": 1},
+                {"a": 2, "b": "extra"}
+            ]"#,
+        )?;
+
+        // Only infer from first record
+        let format = JsonFormat::default()
+            .with_format_array(true)
+            .with_schema_infer_max_rec(1);
+
+        let file_schema = format
+            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+            .await
+            .expect("Schema inference");
+
+        // Should only have field "a" since we limited to 1 record
+        let fields = file_schema
+            .fields()
+            .iter()
+            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
+            .collect::<Vec<_>>();
+        assert_eq!(vec!["a: Int64"], fields);
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_format_read_data() -> Result<()> {
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let task_ctx = ctx.task_ctx();
+        let store = Arc::new(LocalFileSystem::new()) as _;
+
+        // Create a temporary file with JSON array format
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
+        std::fs::write(
+            &path,
+            r#"[
+            {"a": 1, "b": 2.0, "c": true},
+            {"a": 2, "b": 3.5, "c": false},
+            {"a": 3, "b": 4.0, "c": true}
+        ]"#,
+        )?;
+
+        let format = JsonFormat::default().with_format_array(true);
+
+        // Infer schema
+        let file_schema = format
+            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+            .await?;
+
+        // Scan and read data
+        let exec = scan_format(
+            &ctx,
+            &format,
+            Some(file_schema),
+            tmp_dir.path().to_str().unwrap(),
+            "array.json",
+            None,
+            None,
+        )
+        .await?;
+        let batches = collect(exec, task_ctx).await?;
+
+        assert_eq!(1, batches.len());
+        assert_eq!(3, batches[0].num_columns());
+        assert_eq!(3, batches[0].num_rows());
+
+        // Verify data
+        let array_a = as_int64_array(batches[0].column(0))?;
+        assert_eq!(
+            vec![1, 2, 3],
+            (0..3).map(|i| array_a.value(i)).collect::<Vec<_>>()
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_json_array_format_with_projection() -> Result<()> {
+        let session = SessionContext::new();
+        let ctx = session.state();
+        let task_ctx = ctx.task_ctx();
+        let store = Arc::new(LocalFileSystem::new()) as _;
+
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
+        std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": 
"world"}]"#)?;
+
+        let format = JsonFormat::default().with_format_array(true);
+        let file_schema = format
+            .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
+            .await?;
+
+        // Project only column "a"
+        let exec = scan_format(
+            &ctx,
+            &format,
+            Some(file_schema),
+            tmp_dir.path().to_str().unwrap(),
+            "array.json",
+            Some(vec![0]),
+            None,
+        )
+        .await?;
+        let batches = collect(exec, task_ctx).await?;
+
+        assert_eq!(1, batches.len());
+        assert_eq!(1, batches[0].num_columns()); // Only 1 column projected
+        assert_eq!(2, batches[0].num_rows());
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ndjson_read_options_format_array() -> Result<()> {
+        let ctx = SessionContext::new();
+
+        // Create a temporary file with JSON array format
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/array.json", tmp_dir.path().to_string_lossy());
+        std::fs::write(
+            &path,
+            r#"[
+            {"a": 1, "b": "hello"},
+            {"a": 2, "b": "world"},
+            {"a": 3, "b": "test"}
+        ]"#,
+        )?;
+
+        // Use NdJsonReadOptions with format_array = true
+        let options = NdJsonReadOptions::default().format_array(true);
+
+        ctx.register_json("json_array_table", &path, options)
+            .await?;
+
+        let result = ctx
+            .sql("SELECT a, b FROM json_array_table ORDER BY a")
+            .await?
+            .collect()
+            .await?;
+
+        assert_snapshot!(batches_to_string(&result), @r"
+    +---+-------+
+    | a | b     |
+    +---+-------+
+    | 1 | hello |
+    | 2 | world |
+    | 3 | test  |
+    +---+-------+
+    ");
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_ndjson_read_options_format_array_with_compression() -> 
Result<()> {
+        use flate2::write::GzEncoder;
+        use flate2::Compression;
+        use std::io::Write;
+
+        let ctx = SessionContext::new();
+
+        // Create a temporary gzip compressed JSON array file
+        let tmp_dir = tempfile::TempDir::new()?;
+        let path = format!("{}/array.json.gz", 
tmp_dir.path().to_string_lossy());
+
+        let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": 
"world"}]"#;
+        let file = std::fs::File::create(&path)?;
+        let mut encoder = GzEncoder::new(file, Compression::default());
+        encoder.write_all(json_content.as_bytes())?;
+        encoder.finish()?;
+
+        // Use NdJsonReadOptions with format_array and GZIP compression
+        let options = NdJsonReadOptions::default()
+            .format_array(true)
+            .file_compression_type(FileCompressionType::GZIP)
+            .file_extension(".json.gz");
+
+        ctx.register_json("json_array_gzip", &path, options).await?;
+
+        let result = ctx
+            .sql("SELECT a, b FROM json_array_gzip ORDER BY a")
+            .await?
+            .collect()
+            .await?;
+
+        assert_snapshot!(batches_to_string(&result), @r"
+    +---+-------+
+    | a | b     |
+    +---+-------+
+    | 1 | hello |
+    | 2 | world |
+    +---+-------+
+    ");
+
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index e78c5f0955..fb707ed3d6 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> {
     pub infinite: bool,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<SortExpr>>,
+    /// Whether the JSON file is in array format `[{...}, {...}]` instead of
+    /// line-delimited format. Defaults to `false`.
+    pub format_array: bool,
 }
 
 impl Default for NdJsonReadOptions<'_> {
@@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> {
             file_compression_type: FileCompressionType::UNCOMPRESSED,
             infinite: false,
             file_sort_order: vec![],
+            format_array: false,
         }
     }
 }
@@ -523,6 +527,19 @@ impl<'a> NdJsonReadOptions<'a> {
         self.file_sort_order = file_sort_order;
         self
     }
+
+    /// Specify how many rows to read for schema inference
+    pub fn schema_infer_max_records(mut self, schema_infer_max_records: usize) 
-> Self {
+        self.schema_infer_max_records = schema_infer_max_records;
+        self
+    }
+
+    /// Specify whether the JSON file is in array format `[{...}, {...}]`
+    /// instead of line-delimited format.
+    pub fn format_array(mut self, format_array: bool) -> Self {
+        self.format_array = format_array;
+        self
+    }
 }
 
 #[async_trait]
@@ -657,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
         let file_format = JsonFormat::default()
             .with_options(table_options.json)
             .with_schema_infer_max_rec(self.schema_infer_max_records)
-            .with_file_compression_type(self.file_compression_type.to_owned());
+            .with_file_compression_type(self.file_compression_type.to_owned())
+            .with_format_array(self.format_array);
 
         ListingOptions::new(Arc::new(file_format))
             .with_file_extension(self.file_extension)
diff --git a/datafusion/core/tests/data/json_array.json 
b/datafusion/core/tests/data/json_array.json
new file mode 100644
index 0000000000..1a8716dbf4
--- /dev/null
+++ b/datafusion/core/tests/data/json_array.json
@@ -0,0 +1,5 @@
+[
+    {"a": 1, "b": "hello"},
+    {"a": 2, "b": "world"},
+    {"a": 3, "b": "test"}
+]
diff --git a/datafusion/core/tests/data/json_empty_array.json 
b/datafusion/core/tests/data/json_empty_array.json
new file mode 100644
index 0000000000..fe51488c70
--- /dev/null
+++ b/datafusion/core/tests/data/json_empty_array.json
@@ -0,0 +1 @@
+[]
diff --git a/datafusion/datasource-json/Cargo.toml 
b/datafusion/datasource-json/Cargo.toml
index 37fa8d43a0..168ae8880e 100644
--- a/datafusion/datasource-json/Cargo.toml
+++ b/datafusion/datasource-json/Cargo.toml
@@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true }
 datafusion-session = { workspace = true }
 futures = { workspace = true }
 object_store = { workspace = true }
+serde_json = { workspace = true }
 tokio = { workspace = true }
 
 # Note: add additional linter rules in lib.rs.
diff --git a/datafusion/datasource-json/src/file_format.rs 
b/datafusion/datasource-json/src/file_format.rs
index 51f4bd7e96..589ad6bea9 100644
--- a/datafusion/datasource-json/src/file_format.rs
+++ b/datafusion/datasource-json/src/file_format.rs
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions
+//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions
 
 use std::any::Any;
 use std::collections::HashMap;
 use std::fmt;
 use std::fmt::Debug;
-use std::io::BufReader;
+use std::io::{BufReader, Read};
 use std::sync::Arc;
 
 use crate::source::JsonSource;
@@ -47,6 +47,8 @@ use datafusion_datasource::file_format::{
 use datafusion_datasource::file_scan_config::{FileScanConfig, 
FileScanConfigBuilder};
 use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
 use datafusion_datasource::sink::{DataSink, DataSinkExec};
+
+use datafusion_datasource::source::DataSourceExec;
 use datafusion_datasource::write::demux::DemuxedStreamReceiver;
 use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
 use datafusion_datasource::write::BatchSerializer;
@@ -58,7 +60,6 @@ use datafusion_session::Session;
 
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
-use datafusion_datasource::source::DataSourceExec;
 use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
 
 #[derive(Default)]
@@ -131,7 +132,23 @@ impl Debug for JsonFormatFactory {
     }
 }
 
-/// New line delimited JSON `FileFormat` implementation.
+/// JSON `FileFormat` implementation supporting both line-delimited and array 
formats.
+///
+/// # Supported Formats
+///
+/// ## Line-Delimited JSON (default)
+/// ```text
+/// {"key1": 1, "key2": "val"}
+/// {"key1": 2, "key2": "vals"}
+/// ```
+///
+/// ## JSON Array Format (when `format_array` option is true)
+/// ```text
+/// [
+///     {"key1": 1, "key2": "val"},
+///     {"key1": 2, "key2": "vals"}
+/// ]
+/// ```
 #[derive(Debug, Default)]
 pub struct JsonFormat {
     options: JsonOptions,
@@ -165,6 +182,49 @@ impl JsonFormat {
         self.options.compression = file_compression_type.into();
         self
     }
+
+    /// Set whether to expect JSON array format instead of line-delimited 
format.
+    ///
+    /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]`
+    /// When `false` (default), expects input like:
+    /// ```text
+    /// {"a": 1}
+    /// {"a": 2}
+    /// ```
+    pub fn with_format_array(mut self, format_array: bool) -> Self {
+        self.options.format_array = format_array;
+        self
+    }
+}
+
+/// Infer schema from a JSON array format file.
+///
+/// This function reads JSON data in array format `[{...}, {...}]` and infers
+/// the Arrow schema from the contained objects.
+fn infer_json_schema_from_json_array<R: Read>(
+    reader: &mut R,
+    max_records: usize,
+) -> std::result::Result<Schema, ArrowError> {
+    let mut content = String::new();
+    reader.read_to_string(&mut content).map_err(|e| {
+        ArrowError::JsonError(format!("Failed to read JSON content: {e}"))
+    })?;
+
+    // Parse as JSON array using serde_json
+    let values: Vec<serde_json::Value> = serde_json::from_str(&content)
+        .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON 
array: {e}")))?;
+
+    // Take only max_records for schema inference
+    let values_to_infer: Vec<_> = 
values.into_iter().take(max_records).collect();
+
+    if values_to_infer.is_empty() {
+        return Err(ArrowError::JsonError(
+            "JSON array is empty, cannot infer schema".to_string(),
+        ));
+    }
+
+    // Use arrow's schema inference on the parsed values
+    infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok))
 }
 
 #[async_trait]
@@ -201,6 +261,8 @@ impl FileFormat for JsonFormat {
             .schema_infer_max_rec
             .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD);
         let file_compression_type = 
FileCompressionType::from(self.options.compression);
+        let is_array_format = self.options.format_array;
+
         for object in objects {
             let mut take_while = || {
                 let should_take = records_to_read > 0;
@@ -216,15 +278,29 @@ impl FileFormat for JsonFormat {
                 GetResultPayload::File(file, _) => {
                     let decoder = file_compression_type.convert_read(file)?;
                     let mut reader = BufReader::new(decoder);
-                    let iter = ValueIter::new(&mut reader, None);
-                    infer_json_schema_from_iterator(iter.take_while(|_| 
take_while()))?
+
+                    if is_array_format {
+                        infer_json_schema_from_json_array(&mut reader, 
records_to_read)?
+                    } else {
+                        let iter = ValueIter::new(&mut reader, None);
+                        infer_json_schema_from_iterator(
+                            iter.take_while(|_| take_while()),
+                        )?
+                    }
                 }
                 GetResultPayload::Stream(_) => {
                     let data = r.bytes().await?;
                     let decoder = 
file_compression_type.convert_read(data.reader())?;
                     let mut reader = BufReader::new(decoder);
-                    let iter = ValueIter::new(&mut reader, None);
-                    infer_json_schema_from_iterator(iter.take_while(|_| 
take_while()))?
+
+                    if is_array_format {
+                        infer_json_schema_from_json_array(&mut reader, 
records_to_read)?
+                    } else {
+                        let iter = ValueIter::new(&mut reader, None);
+                        infer_json_schema_from_iterator(
+                            iter.take_while(|_| take_while()),
+                        )?
+                    }
                 }
             };
 
@@ -253,7 +329,8 @@ impl FileFormat for JsonFormat {
         _state: &dyn Session,
         conf: FileScanConfig,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let source = Arc::new(JsonSource::new());
+        let source =
+            
Arc::new(JsonSource::new().with_format_array(self.options.format_array));
         let conf = FileScanConfigBuilder::from(conf)
             .with_file_compression_type(FileCompressionType::from(
                 self.options.compression,
@@ -282,7 +359,7 @@ impl FileFormat for JsonFormat {
     }
 
     fn file_source(&self) -> Arc<dyn FileSource> {
-        Arc::new(JsonSource::default())
+        
Arc::new(JsonSource::new().with_format_array(self.options.format_array))
     }
 }
 
diff --git a/datafusion/datasource-json/src/source.rs 
b/datafusion/datasource-json/src/source.rs
index 52ed0def03..d1107b0b97 100644
--- a/datafusion/datasource-json/src/source.rs
+++ b/datafusion/datasource-json/src/source.rs
@@ -36,6 +36,7 @@ use datafusion_datasource::{
 };
 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
+use arrow::array::RecordBatch;
 use arrow::json::ReaderBuilder;
 use arrow::{datatypes::SchemaRef, json};
 use datafusion_common::Statistics;
@@ -55,6 +56,7 @@ pub struct JsonOpener {
     projected_schema: SchemaRef,
     file_compression_type: FileCompressionType,
     object_store: Arc<dyn ObjectStore>,
+    format_array: bool,
 }
 
 impl JsonOpener {
@@ -64,12 +66,14 @@ impl JsonOpener {
         projected_schema: SchemaRef,
         file_compression_type: FileCompressionType,
         object_store: Arc<dyn ObjectStore>,
+        format_array: bool,
     ) -> Self {
         Self {
             batch_size,
             projected_schema,
             file_compression_type,
             object_store,
+            format_array,
         }
     }
 }
@@ -81,6 +85,7 @@ pub struct JsonSource {
     metrics: ExecutionPlanMetricsSet,
     projected_statistics: Option<Statistics>,
     schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
+    format_array: bool,
 }
 
 impl JsonSource {
@@ -88,6 +93,12 @@ impl JsonSource {
     pub fn new() -> Self {
         Self::default()
     }
+
+    /// Set whether to expect JSON array format
+    pub fn with_format_array(mut self, format_array: bool) -> Self {
+        self.format_array = format_array;
+        self
+    }
 }
 
 impl From<JsonSource> for Arc<dyn FileSource> {
@@ -110,6 +121,7 @@ impl FileSource for JsonSource {
             projected_schema: base_config.projected_file_schema(),
             file_compression_type: base_config.file_compression_type,
             object_store,
+            format_array: self.format_array,
         })
     }
 
@@ -181,6 +193,16 @@ impl FileOpener for JsonOpener {
         let schema = Arc::clone(&self.projected_schema);
         let batch_size = self.batch_size;
         let file_compression_type = self.file_compression_type.to_owned();
+        let format_array = self.format_array;
+
+        // JSON array format requires reading the complete file
+        if format_array && partitioned_file.range.is_some() {
+            return Err(DataFusionError::NotImplemented(
+                "JSON array format does not support range-based file scanning. 
\
+                 Disable repartition_file_scans or use line-delimited JSON 
format."
+                    .to_string(),
+            ));
+        }
 
         Ok(Box::pin(async move {
             let calculated_range =
@@ -217,33 +239,94 @@ impl FileOpener for JsonOpener {
                         }
                     };
 
-                    let reader = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build(BufReader::new(bytes))?;
-
-                    Ok(futures::stream::iter(reader)
-                        .map(|r| r.map_err(Into::into))
-                        .boxed())
+                    if format_array {
+                        // Handle JSON array format
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(bytes),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    } else {
+                        let reader = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build(BufReader::new(bytes))?;
+                        Ok(futures::stream::iter(reader)
+                            .map(|r| r.map_err(Into::into))
+                            .boxed())
+                    }
                 }
                 GetResultPayload::Stream(s) => {
-                    let s = s.map_err(DataFusionError::from);
-
-                    let decoder = ReaderBuilder::new(schema)
-                        .with_batch_size(batch_size)
-                        .build_decoder()?;
-                    let input = 
file_compression_type.convert_stream(s.boxed())?.fuse();
-
-                    let stream = deserialize_stream(
-                        input,
-                        DecoderDeserializer::new(JsonDecoder::new(decoder)),
-                    );
-                    Ok(stream.map_err(Into::into).boxed())
+                    if format_array {
+                        // For streaming, we need to collect all bytes first
+                        let bytes = s
+                            .map_err(DataFusionError::from)
+                            .try_fold(Vec::new(), |mut acc, chunk| async move {
+                                acc.extend_from_slice(&chunk);
+                                Ok(acc)
+                            })
+                            .await?;
+                        let decompressed = file_compression_type
+                            .convert_read(std::io::Cursor::new(bytes))?;
+                        let batches = read_json_array_to_batches(
+                            BufReader::new(decompressed),
+                            schema,
+                            batch_size,
+                        )?;
+                        
Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed())
+                    } else {
+                        let s = s.map_err(DataFusionError::from);
+                        let decoder = ReaderBuilder::new(schema)
+                            .with_batch_size(batch_size)
+                            .build_decoder()?;
+                        let input =
+                            
file_compression_type.convert_stream(s.boxed())?.fuse();
+                        let stream = deserialize_stream(
+                            input,
+                            
DecoderDeserializer::new(JsonDecoder::new(decoder)),
+                        );
+                        Ok(stream.map_err(Into::into).boxed())
+                    }
                 }
             }
         }))
     }
 }
 
+/// Read JSON array format and convert to RecordBatches
+fn read_json_array_to_batches<R: Read>(
+    mut reader: R,
+    schema: SchemaRef,
+    batch_size: usize,
+) -> Result<Vec<RecordBatch>> {
+    use arrow::json::ReaderBuilder;
+
+    let mut content = String::new();
+    reader.read_to_string(&mut content)?;
+
+    // Parse JSON array
+    let values: Vec<serde_json::Value> = serde_json::from_str(&content)
+        .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+    if values.is_empty() {
+        return Ok(vec![RecordBatch::new_empty(schema)]);
+    }
+
+    // Convert to NDJSON string for arrow-json reader
+    let ndjson: String = values
+        .iter()
+        .map(|v| v.to_string())
+        .collect::<Vec<_>>()
+        .join("\n");
+
+    let cursor = std::io::Cursor::new(ndjson);
+    let reader = ReaderBuilder::new(schema)
+        .with_batch_size(batch_size)
+        .build(cursor)?;
+
+    reader.collect::<Result<Vec<_>, _>>().map_err(Into::into)
+}
+
 pub async fn plan_to_json(
     task_ctx: Arc<TaskContext>,
     plan: Arc<dyn ExecutionPlan>,
diff --git a/datafusion/proto-common/proto/datafusion_common.proto 
b/datafusion/proto-common/proto/datafusion_common.proto
index 267953556b..c598b8dcb6 100644
--- a/datafusion/proto-common/proto/datafusion_common.proto
+++ b/datafusion/proto-common/proto/datafusion_common.proto
@@ -467,6 +467,8 @@ message CsvOptions {
 message JsonOptions {
   CompressionTypeVariant compression = 1; // Compression type
   optional uint64 schema_infer_max_rec = 2; // Optional max records for schema 
inference
+  optional uint32 compression_level = 3; // Optional compression level
+  bool format_array = 4; // Whether the JSON is in array format [{},...] 
(default false = line-delimited)
 }
 
 message TableParquetOptions {
diff --git a/datafusion/proto-common/src/from_proto/mod.rs 
b/datafusion/proto-common/src/from_proto/mod.rs
index 4ede5b970e..71f8cc3e78 100644
--- a/datafusion/proto-common/src/from_proto/mod.rs
+++ b/datafusion/proto-common/src/from_proto/mod.rs
@@ -1092,6 +1092,8 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions {
         Ok(JsonOptions {
             compression: compression.into(),
             schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as 
usize),
+            compression_level: proto_opts.compression_level.map(|h| h as 
usize),
+            format_array: proto_opts.format_array,
         })
     }
 }
diff --git a/datafusion/proto-common/src/generated/pbjson.rs 
b/datafusion/proto-common/src/generated/pbjson.rs
index e63f345459..cc646e48b0 100644
--- a/datafusion/proto-common/src/generated/pbjson.rs
+++ b/datafusion/proto-common/src/generated/pbjson.rs
@@ -4548,6 +4548,12 @@ impl serde::Serialize for JsonOptions {
         if self.schema_infer_max_rec.is_some() {
             len += 1;
         }
+        if self.compression_level.is_some() {
+            len += 1;
+        }
+        if self.format_array {
+            len += 1;
+        }
         let mut struct_ser = 
serializer.serialize_struct("datafusion_common.JsonOptions", len)?;
         if self.compression != 0 {
             let v = CompressionTypeVariant::try_from(self.compression)
@@ -4559,6 +4565,12 @@ impl serde::Serialize for JsonOptions {
             #[allow(clippy::needless_borrows_for_generic_args)]
             struct_ser.serialize_field("schemaInferMaxRec", 
ToString::to_string(&v).as_str())?;
         }
+        if let Some(v) = self.compression_level.as_ref() {
+            struct_ser.serialize_field("compressionLevel", v)?;
+        }
+        if self.format_array {
+            struct_ser.serialize_field("formatArray", &self.format_array)?;
+        }
         struct_ser.end()
     }
 }
@@ -4572,12 +4584,18 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
             "compression",
             "schema_infer_max_rec",
             "schemaInferMaxRec",
+            "compression_level",
+            "compressionLevel",
+            "format_array",
+            "formatArray",
         ];
 
         #[allow(clippy::enum_variant_names)]
         enum GeneratedField {
             Compression,
             SchemaInferMaxRec,
+            CompressionLevel,
+            FormatArray,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -4601,6 +4619,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
                         match value {
                             "compression" => Ok(GeneratedField::Compression),
                             "schemaInferMaxRec" | "schema_infer_max_rec" => 
Ok(GeneratedField::SchemaInferMaxRec),
+                            "compressionLevel" | "compression_level" => 
Ok(GeneratedField::CompressionLevel),
+                            "formatArray" | "format_array" => 
Ok(GeneratedField::FormatArray),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -4622,6 +4642,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
             {
                 let mut compression__ = None;
                 let mut schema_infer_max_rec__ = None;
+                let mut compression_level__ = None;
+                let mut format_array__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::Compression => {
@@ -4638,11 +4660,27 @@ impl<'de> serde::Deserialize<'de> for JsonOptions {
                                 
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
                             ;
                         }
+                        GeneratedField::CompressionLevel => {
+                            if compression_level__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("compressionLevel"));
+                            }
+                            compression_level__ = 
+                                
map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x|
 x.0)
+                            ;
+                        }
+                        GeneratedField::FormatArray => {
+                            if format_array__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("formatArray"));
+                            }
+                            format_array__ = Some(map_.next_value()?);
+                        }
                     }
                 }
                 Ok(JsonOptions {
                     compression: compression__.unwrap_or_default(),
                     schema_infer_max_rec: schema_infer_max_rec__,
+                    compression_level: compression_level__,
+                    format_array: format_array__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto-common/src/generated/prost.rs 
b/datafusion/proto-common/src/generated/prost.rs
index aa7c3d51a9..24d6600e4d 100644
--- a/datafusion/proto-common/src/generated/prost.rs
+++ b/datafusion/proto-common/src/generated/prost.rs
@@ -659,6 +659,12 @@ pub struct JsonOptions {
     /// Optional max records for schema inference
     #[prost(uint64, optional, tag = "2")]
     pub schema_infer_max_rec: ::core::option::Option<u64>,
+    /// Optional compression level
+    #[prost(uint32, optional, tag = "3")]
+    pub compression_level: ::core::option::Option<u32>,
+    /// Whether the JSON is in array format \[{},...\] (default false = 
line-delimited)
+    #[prost(bool, tag = "4")]
+    pub format_array: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TableParquetOptions {
diff --git a/datafusion/proto-common/src/to_proto/mod.rs 
b/datafusion/proto-common/src/to_proto/mod.rs
index e9de1d9e9a..69f3c269c7 100644
--- a/datafusion/proto-common/src/to_proto/mod.rs
+++ b/datafusion/proto-common/src/to_proto/mod.rs
@@ -986,6 +986,8 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions {
         Ok(protobuf::JsonOptions {
             compression: compression.into(),
             schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64),
+            compression_level: opts.compression_level.map(|h| h as u32),
+            format_array: opts.format_array,
         })
     }
 }
diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs 
b/datafusion/proto/src/generated/datafusion_proto_common.rs
index aa7c3d51a9..24d6600e4d 100644
--- a/datafusion/proto/src/generated/datafusion_proto_common.rs
+++ b/datafusion/proto/src/generated/datafusion_proto_common.rs
@@ -659,6 +659,12 @@ pub struct JsonOptions {
     /// Optional max records for schema inference
     #[prost(uint64, optional, tag = "2")]
     pub schema_infer_max_rec: ::core::option::Option<u64>,
+    /// Optional compression level
+    #[prost(uint32, optional, tag = "3")]
+    pub compression_level: ::core::option::Option<u32>,
+    /// Whether the JSON is in array format \[{},...\] (default false = 
line-delimited)
+    #[prost(bool, tag = "4")]
+    pub format_array: bool,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct TableParquetOptions {
diff --git a/datafusion/proto/src/logical_plan/file_formats.rs 
b/datafusion/proto/src/logical_plan/file_formats.rs
index d32bfb22ff..3685e61542 100644
--- a/datafusion/proto/src/logical_plan/file_formats.rs
+++ b/datafusion/proto/src/logical_plan/file_formats.rs
@@ -238,6 +238,8 @@ impl JsonOptionsProto {
             JsonOptionsProto {
                 compression: options.compression as i32,
                 schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v 
as u64),
+                compression_level: options.compression_level.map(|v| v as u32),
+                format_array: options.format_array,
             }
         } else {
             JsonOptionsProto::default()
@@ -256,6 +258,8 @@ impl From<&JsonOptionsProto> for JsonOptions {
                 _ => CompressionTypeVariant::UNCOMPRESSED,
             },
             schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as 
usize),
+            compression_level: proto.compression_level.map(|v| v as usize),
+            format_array: proto.format_array,
         }
     }
 }
diff --git a/datafusion/sqllogictest/test_files/json.slt 
b/datafusion/sqllogictest/test_files/json.slt
index b46b8c49d6..4442a6a2d5 100644
--- a/datafusion/sqllogictest/test_files/json.slt
+++ b/datafusion/sqllogictest/test_files/json.slt
@@ -146,3 +146,31 @@ EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2
 ----
 logical_plan TableScan: json_partitioned_test projection=[id], 
full_filters=[json_partitioned_test.part = Int32(2)]
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]},
 projection=[id], file_type=json
+
+##########
+## JSON Array Format Tests
+##########
+
+# Test reading JSON array format file with format_array=true
+statement ok
+CREATE EXTERNAL TABLE json_array_test
+STORED AS JSON
+OPTIONS ('format.format_array' 'true')
+LOCATION '../core/tests/data/json_array.json';
+
+query IT rowsort
+SELECT a, b FROM json_array_test
+----
+1 hello
+2 world
+3 test
+
+statement ok
+DROP TABLE json_array_test;
+
+# Test that reading JSON array format WITHOUT format_array option fails
+# (default is line-delimited mode which can't parse array format correctly)
+statement error Not valid JSON
+CREATE EXTERNAL TABLE json_array_as_ndjson
+STORED AS JSON
+LOCATION '../core/tests/data/json_array.json';
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to