This is an automated email from the ASF dual-hosted git repository.
jeffreyvo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 000353df20 Add support for de/serializing list-encoded JSON structs
[#6558] (#6643)
000353df20 is described below
commit 000353df20b2b586a0c8dcb4be1721469645a15a
Author: James Gill <[email protected]>
AuthorDate: Sun Jan 26 17:35:22 2025 -0500
Add support for de/serializing list-encoded JSON structs [#6558] (#6643)
Currently, a StructArray can only be deserialized from or serialized to
a JSON object (e.g. `{a: 1, b: "c"}`), but some services (e.g. Presto
and Trino) encode ROW types as JSON lists (e.g. `[1, "c"]`) because this
is more compact, and the schema is known.
This PR adds the ability to encode and decode JSON lists from and to
StructArrays, if StructMode is set to ListOnly. In ListOnly mode,
object-encoded structs raise an error. Setting to ObjectOnly (the
default) has the original parsing behavior.
Some notes/questions/points for discussion:
1. I've made a JsonParseMode struct instead of a bool flag for two
reasons. One is that it's self-descriptive (what would `true` be?),
and the other is that it allows a future Mixed mode that could
deserialize either. The latter isn't currently requested by anyone.
2. I kept the error messages as similar to the old messages as possible.
I considered having more specific error messages (like "Encountered a
'[' when parsing a Struct, but the StructParseMode is ObjectOnly" or
similar), but wanted to hear opinions before I went that route.
3. I'm not attached to any name/code-style/etc, so happy to modify to
fit local conventions.
Fixes #6558
---
arrow-json/src/lib.rs | 97 +++++++++++
arrow-json/src/reader/list_array.rs | 3 +
arrow-json/src/reader/map_array.rs | 4 +
arrow-json/src/reader/mod.rs | 301 +++++++++++++++++++++++++++++++++-
arrow-json/src/reader/struct_array.rs | 111 +++++++++----
arrow-json/src/writer/encoder.rs | 24 ++-
arrow-json/src/writer/mod.rs | 85 +++++++++-
7 files changed, 579 insertions(+), 46 deletions(-)
diff --git a/arrow-json/src/lib.rs b/arrow-json/src/lib.rs
index b6c441012b..bb02d0a317 100644
--- a/arrow-json/src/lib.rs
+++ b/arrow-json/src/lib.rs
@@ -74,6 +74,35 @@ pub use self::writer::{ArrayWriter, LineDelimitedWriter,
Writer, WriterBuilder};
use half::f16;
use serde_json::{Number, Value};
+/// Specifies what is considered valid JSON when reading or writing
+/// RecordBatches or StructArrays.
+///
+/// This enum controls which form(s) the Reader will accept and which form the
+/// Writer will produce. For example, if the RecordBatch Schema is
+/// `[("a", Int32), ("r", Struct([("b", Boolean), ("c", Utf8)]))]`
+/// then a Reader with [`StructMode::ObjectOnly`] would read rows of the form
+/// `{"a": 1, "r": {"b": true, "c": "cat"}}` while with
['StructMode::ListOnly']
+/// would read rows of the form `[1, [true, "cat"]]`. A Writer would produce
+/// rows formatted similarly.
+///
+/// The list encoding is more compact if the schema is known, and is used by
+/// tools such as [Presto] and [Trino].
+///
+/// When reading objects, the order of the key does not matter. When reading
+/// lists, the entries must be the same number and in the same order as the
+/// struct fields. Map columns are not affected by this option.
+///
+/// [Presto]:
(https://prestodb.io/docs/current/develop/client-protocol.html#important-queryresults-attributes)
+/// [Trino]:
(https://trino.io/docs/current/develop/client-protocol.html#important-queryresults-attributes)
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
+pub enum StructMode {
+ #[default]
+ /// Encode/decode structs as objects (e.g., {"a": 1, "b": "c"})
+ ObjectOnly,
+ /// Encode/decode structs as lists (e.g., [1, "c"])
+ ListOnly,
+}
+
/// Trait declaring any type that is serializable to JSON. This includes all
primitive types (bool, i32, etc.).
pub trait JsonSerializable: 'static {
/// Converts self into json value if its possible
@@ -156,4 +185,72 @@ mod tests {
);
assert_eq!(None, f32::NAN.into_json_value());
}
+
+ #[test]
+ fn test_json_roundtrip_structs() {
+ use crate::writer::LineDelimited;
+ use arrow_schema::DataType;
+ use arrow_schema::Field;
+ use arrow_schema::Fields;
+ use arrow_schema::Schema;
+ use std::sync::Arc;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new(
+ "c1",
+ DataType::Struct(Fields::from(vec![
+ Field::new("c11", DataType::Int32, true),
+ Field::new(
+ "c12",
+ DataType::Struct(vec![Field::new("c121",
DataType::Utf8, false)].into()),
+ false,
+ ),
+ ])),
+ false,
+ ),
+ Field::new("c2", DataType::Utf8, false),
+ ]));
+
+ {
+ let object_input = r#"{"c1":{"c11":1,"c12":{"c121":"e"}},"c2":"a"}
+{"c1":{"c12":{"c121":"f"}},"c2":"b"}
+{"c1":{"c11":5,"c12":{"c121":"g"}},"c2":"c"}
+"#
+ .as_bytes();
+ let object_reader = ReaderBuilder::new(schema.clone())
+ .with_struct_mode(StructMode::ObjectOnly)
+ .build(object_input)
+ .unwrap();
+
+ let mut object_output: Vec<u8> = Vec::new();
+ let mut object_writer = WriterBuilder::new()
+ .with_struct_mode(StructMode::ObjectOnly)
+ .build::<_, LineDelimited>(&mut object_output);
+ for batch_res in object_reader {
+ object_writer.write(&batch_res.unwrap()).unwrap();
+ }
+ assert_eq!(object_input, &object_output);
+ }
+
+ {
+ let list_input = r#"[[1,["e"]],"a"]
+[[null,["f"]],"b"]
+[[5,["g"]],"c"]
+"#
+ .as_bytes();
+ let list_reader = ReaderBuilder::new(schema.clone())
+ .with_struct_mode(StructMode::ListOnly)
+ .build(list_input)
+ .unwrap();
+
+ let mut list_output: Vec<u8> = Vec::new();
+ let mut list_writer = WriterBuilder::new()
+ .with_struct_mode(StructMode::ListOnly)
+ .build::<_, LineDelimited>(&mut list_output);
+ for batch_res in list_reader {
+ list_writer.write(&batch_res.unwrap()).unwrap();
+ }
+ assert_eq!(list_input, &list_output);
+ }
+ }
}
diff --git a/arrow-json/src/reader/list_array.rs
b/arrow-json/src/reader/list_array.rs
index b6f8c18ea9..1a1dee6a23 100644
--- a/arrow-json/src/reader/list_array.rs
+++ b/arrow-json/src/reader/list_array.rs
@@ -17,6 +17,7 @@
use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{make_decoder, ArrayDecoder};
+use crate::StructMode;
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_array::OffsetSizeTrait;
use arrow_buffer::buffer::NullBuffer;
@@ -37,6 +38,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
+ struct_mode: StructMode,
) -> Result<Self, ArrowError> {
let field = match &data_type {
DataType::List(f) if !O::IS_LARGE => f,
@@ -48,6 +50,7 @@ impl<O: OffsetSizeTrait> ListArrayDecoder<O> {
coerce_primitive,
strict_mode,
field.is_nullable(),
+ struct_mode,
)?;
Ok(Self {
diff --git a/arrow-json/src/reader/map_array.rs
b/arrow-json/src/reader/map_array.rs
index cd1ca5f71f..ee78373a55 100644
--- a/arrow-json/src/reader/map_array.rs
+++ b/arrow-json/src/reader/map_array.rs
@@ -17,6 +17,7 @@
use crate::reader::tape::{Tape, TapeElement};
use crate::reader::{make_decoder, ArrayDecoder};
+use crate::StructMode;
use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::ArrowNativeType;
@@ -36,6 +37,7 @@ impl MapArrayDecoder {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
+ struct_mode: StructMode,
) -> Result<Self, ArrowError> {
let fields = match &data_type {
DataType::Map(_, true) => {
@@ -59,12 +61,14 @@ impl MapArrayDecoder {
coerce_primitive,
strict_mode,
fields[0].is_nullable(),
+ struct_mode,
)?;
let values = make_decoder(
fields[1].data_type().clone(),
coerce_primitive,
strict_mode,
fields[1].is_nullable(),
+ struct_mode,
)?;
Ok(Self {
diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs
index f857e8813c..704fdbeb95 100644
--- a/arrow-json/src/reader/mod.rs
+++ b/arrow-json/src/reader/mod.rs
@@ -133,6 +133,7 @@
//! ```
//!
+use crate::StructMode;
use std::io::BufRead;
use std::sync::Arc;
@@ -176,6 +177,7 @@ pub struct ReaderBuilder {
coerce_primitive: bool,
strict_mode: bool,
is_field: bool,
+ struct_mode: StructMode,
schema: SchemaRef,
}
@@ -195,6 +197,7 @@ impl ReaderBuilder {
coerce_primitive: false,
strict_mode: false,
is_field: false,
+ struct_mode: Default::default(),
schema,
}
}
@@ -235,6 +238,7 @@ impl ReaderBuilder {
coerce_primitive: false,
strict_mode: false,
is_field: true,
+ struct_mode: Default::default(),
schema: Arc::new(Schema::new([field.into()])),
}
}
@@ -253,8 +257,11 @@ impl ReaderBuilder {
}
}
- /// Sets if the decoder should return an error if it encounters a column
not present
- /// in `schema`
+ /// Sets if the decoder should return an error if it encounters a column
not
+ /// present in `schema`. If `struct_mode` is `ListOnly` the value of
+ /// `strict_mode` is effectively `true`. It is required for all fields of
+ /// the struct to be in the list: without field names, there is no way to
+ /// determine which field is missing.
pub fn with_strict_mode(self, strict_mode: bool) -> Self {
Self {
strict_mode,
@@ -262,6 +269,16 @@ impl ReaderBuilder {
}
}
+ /// Set the [`StructMode`] for the reader, which determines whether structs
+ /// can be decoded from JSON as objects or lists. For more details refer to
+ /// the enum documentation. Default is to use `ObjectOnly`.
+ pub fn with_struct_mode(self, struct_mode: StructMode) -> Self {
+ Self {
+ struct_mode,
+ ..self
+ }
+ }
+
/// Create a [`Reader`] with the provided [`BufRead`]
pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError>
{
Ok(Reader {
@@ -280,7 +297,13 @@ impl ReaderBuilder {
}
};
- let decoder = make_decoder(data_type, self.coerce_primitive,
self.strict_mode, nullable)?;
+ let decoder = make_decoder(
+ data_type,
+ self.coerce_primitive,
+ self.strict_mode,
+ nullable,
+ self.struct_mode,
+ )?;
let num_fields = self.schema.flattened_fields().len();
@@ -643,6 +666,7 @@ fn make_decoder(
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
+ struct_mode: StructMode,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
downcast_integer! {
data_type => (primitive_decoder, data_type),
@@ -693,13 +717,13 @@ fn make_decoder(
DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
DataType::Utf8 =>
Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
DataType::LargeUtf8 =>
Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
- DataType::List(_) =>
Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive,
strict_mode, is_nullable)?)),
- DataType::LargeList(_) =>
Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive,
strict_mode, is_nullable)?)),
- DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type,
coerce_primitive, strict_mode, is_nullable)?)),
+ DataType::List(_) =>
Ok(Box::new(ListArrayDecoder::<i32>::new(data_type, coerce_primitive,
strict_mode, is_nullable, struct_mode)?)),
+ DataType::LargeList(_) =>
Ok(Box::new(ListArrayDecoder::<i64>::new(data_type, coerce_primitive,
strict_mode, is_nullable, struct_mode)?)),
+ DataType::Struct(_) => Ok(Box::new(StructArrayDecoder::new(data_type,
coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
DataType::Binary | DataType::LargeBinary |
DataType::FixedSizeBinary(_) => {
Err(ArrowError::JsonError(format!("{data_type} is not supported by
JSON")))
}
- DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type,
coerce_primitive, strict_mode, is_nullable)?)),
+ DataType::Map(_, _) => Ok(Box::new(MapArrayDecoder::new(data_type,
coerce_primitive, strict_mode, is_nullable, struct_mode)?)),
d => Err(ArrowError::NotYetImplemented(format!("Support for {d} in
JSON reader")))
}
}
@@ -715,7 +739,7 @@ mod tests {
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use arrow_data::ArrayDataBuilder;
- use arrow_schema::Field;
+ use arrow_schema::{Field, Fields};
use super::*;
@@ -2343,4 +2367,265 @@ mod tests {
.unwrap()
);
}
+
+ // Parse the given `row` in `struct_mode` as a type given by fields.
+ //
+ // If as_struct == true, wrap the fields in a Struct field with name "r".
+ // If as_struct == false, wrap the fields in a Schema.
+ fn _parse_structs(
+ row: &str,
+ struct_mode: StructMode,
+ fields: Fields,
+ as_struct: bool,
+ ) -> Result<RecordBatch, ArrowError> {
+ let builder = if as_struct {
+ ReaderBuilder::new_with_field(Field::new("r",
DataType::Struct(fields), true))
+ } else {
+ ReaderBuilder::new(Arc::new(Schema::new(fields)))
+ };
+ builder
+ .with_struct_mode(struct_mode)
+ .build(Cursor::new(row.as_bytes()))
+ .unwrap()
+ .next()
+ .unwrap()
+ }
+
+ #[test]
+ fn test_struct_decoding_list_length() {
+ use arrow_array::array;
+
+ let row = "[1, 2]";
+
+ let mut fields = vec![Field::new("a", DataType::Int32, true)];
+ let too_few_fields = Fields::from(fields.clone());
+ fields.push(Field::new("b", DataType::Int32, true));
+ let correct_fields = Fields::from(fields.clone());
+ fields.push(Field::new("c", DataType::Int32, true));
+ let too_many_fields = Fields::from(fields.clone());
+
+ let parse = |fields: Fields, as_struct: bool| {
+ _parse_structs(row, StructMode::ListOnly, fields, as_struct)
+ };
+
+ let expected_row = StructArray::new(
+ correct_fields.clone(),
+ vec![
+ Arc::new(array::Int32Array::from(vec![1])),
+ Arc::new(array::Int32Array::from(vec![2])),
+ ],
+ None,
+ );
+ let row_field = Field::new("r",
DataType::Struct(correct_fields.clone()), true);
+
+ assert_eq!(
+ parse(too_few_fields.clone(), true).unwrap_err().to_string(),
+ "Json error: found extra columns for 1 fields".to_string()
+ );
+ assert_eq!(
+ parse(too_few_fields, false).unwrap_err().to_string(),
+ "Json error: found extra columns for 1 fields".to_string()
+ );
+ assert_eq!(
+ parse(correct_fields.clone(), true).unwrap(),
+ RecordBatch::try_new(
+ Arc::new(Schema::new(vec![row_field])),
+ vec![Arc::new(expected_row.clone())]
+ )
+ .unwrap()
+ );
+ assert_eq!(
+ parse(correct_fields, false).unwrap(),
+ RecordBatch::from(expected_row)
+ );
+ assert_eq!(
+ parse(too_many_fields.clone(), true)
+ .unwrap_err()
+ .to_string(),
+ "Json error: found 2 columns for 3 fields".to_string()
+ );
+ assert_eq!(
+ parse(too_many_fields, false).unwrap_err().to_string(),
+ "Json error: found 2 columns for 3 fields".to_string()
+ );
+ }
+
+ #[test]
+ fn test_struct_decoding() {
+ use arrow_array::builder;
+
+ let nested_object_json = r#"{"a": {"b": [1, 2], "c": {"d": 3}}}"#;
+ let nested_list_json = r#"[[[1, 2], {"d": 3}]]"#;
+ let nested_mixed_json = r#"{"a": [[1, 2], {"d": 3}]}"#;
+
+ let struct_fields = Fields::from(vec![
+ Field::new("b", DataType::new_list(DataType::Int32, true), true),
+ Field::new_map(
+ "c",
+ "entries",
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Int32, true),
+ false,
+ false,
+ ),
+ ]);
+
+ let list_array =
+ ListArray::from_iter_primitive::<Int32Type, _,
_>(vec![Some(vec![Some(1), Some(2)])]);
+
+ let map_array = {
+ let mut map_builder = builder::MapBuilder::new(
+ None,
+ builder::StringBuilder::new(),
+ builder::Int32Builder::new(),
+ );
+ map_builder.keys().append_value("d");
+ map_builder.values().append_value(3);
+ map_builder.append(true).unwrap();
+ map_builder.finish()
+ };
+
+ let struct_array = StructArray::new(
+ struct_fields.clone(),
+ vec![Arc::new(list_array), Arc::new(map_array)],
+ None,
+ );
+
+ let fields = Fields::from(vec![Field::new("a",
DataType::Struct(struct_fields), true)]);
+ let schema = Arc::new(Schema::new(fields.clone()));
+ let expected = RecordBatch::try_new(schema.clone(),
vec![Arc::new(struct_array)]).unwrap();
+
+ let parse = |row: &str, struct_mode: StructMode| {
+ _parse_structs(row, struct_mode, fields.clone(), false)
+ };
+
+ assert_eq!(
+ parse(nested_object_json, StructMode::ObjectOnly).unwrap(),
+ expected
+ );
+ assert_eq!(
+ parse(nested_list_json, StructMode::ObjectOnly)
+ .unwrap_err()
+ .to_string(),
+ "Json error: expected { got [[[1, 2], {\"d\": 3}]]".to_owned()
+ );
+ assert_eq!(
+ parse(nested_mixed_json, StructMode::ObjectOnly)
+ .unwrap_err()
+ .to_string(),
+ "Json error: whilst decoding field 'a': expected { got [[1, 2],
{\"d\": 3}]".to_owned()
+ );
+
+ assert_eq!(
+ parse(nested_list_json, StructMode::ListOnly).unwrap(),
+ expected
+ );
+ assert_eq!(
+ parse(nested_object_json, StructMode::ListOnly)
+ .unwrap_err()
+ .to_string(),
+ "Json error: expected [ got {\"a\": {\"b\": [1, 2]\"c\": {\"d\":
3}}}".to_owned()
+ );
+ assert_eq!(
+ parse(nested_mixed_json, StructMode::ListOnly)
+ .unwrap_err()
+ .to_string(),
+ "Json error: expected [ got {\"a\": [[1, 2], {\"d\":
3}]}".to_owned()
+ );
+ }
+
+ // Test cases:
+ // [] -> RecordBatch row with no entries. Schema = [('a', Int32)] -> Error
+ // [] -> RecordBatch row with no entries. Schema = [('r', [('a', Int32)])]
-> Error
+ // [] -> StructArray row with no entries. Fields [('a', Int32')] -> Error
+ // [[]] -> RecordBatch row with empty struct entry. Schema = [('r', [('a',
Int32)])] -> Error
+ #[test]
+ fn test_struct_decoding_empty_list() {
+ let int_field = Field::new("a", DataType::Int32, true);
+ let struct_field = Field::new(
+ "r",
+ DataType::Struct(Fields::from(vec![int_field.clone()])),
+ true,
+ );
+
+ let parse = |row: &str, as_struct: bool, field: Field| {
+ _parse_structs(
+ row,
+ StructMode::ListOnly,
+ Fields::from(vec![field]),
+ as_struct,
+ )
+ };
+
+ // Missing fields
+ assert_eq!(
+ parse("[]", true, struct_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: found 0 columns for 1 fields".to_owned()
+ );
+ assert_eq!(
+ parse("[]", false, int_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: found 0 columns for 1 fields".to_owned()
+ );
+ assert_eq!(
+ parse("[]", false, struct_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: found 0 columns for 1 fields".to_owned()
+ );
+ assert_eq!(
+ parse("[[]]", false, struct_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: whilst decoding field 'r': found 0 columns for 1
fields".to_owned()
+ );
+ }
+
+ #[test]
+ fn test_decode_list_struct_with_wrong_types() {
+ let int_field = Field::new("a", DataType::Int32, true);
+ let struct_field = Field::new(
+ "r",
+ DataType::Struct(Fields::from(vec![int_field.clone()])),
+ true,
+ );
+
+ let parse = |row: &str, as_struct: bool, field: Field| {
+ _parse_structs(
+ row,
+ StructMode::ListOnly,
+ Fields::from(vec![field]),
+ as_struct,
+ )
+ };
+
+ // Wrong values
+ assert_eq!(
+ parse(r#"[["a"]]"#, false, struct_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: whilst decoding field 'r': whilst decoding field 'a':
failed to parse \"a\" as Int32".to_owned()
+ );
+ assert_eq!(
+ parse(r#"[["a"]]"#, true, struct_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: whilst decoding field 'r': whilst decoding field 'a':
failed to parse \"a\" as Int32".to_owned()
+ );
+ assert_eq!(
+ parse(r#"["a"]"#, true, int_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: whilst decoding field 'a': failed to parse \"a\" as
Int32".to_owned()
+ );
+ assert_eq!(
+ parse(r#"["a"]"#, false, int_field.clone())
+ .unwrap_err()
+ .to_string(),
+ "Json error: whilst decoding field 'a': failed to parse \"a\" as
Int32".to_owned()
+ );
+ }
}
diff --git a/arrow-json/src/reader/struct_array.rs
b/arrow-json/src/reader/struct_array.rs
index 6c805591d3..b9408df77a 100644
--- a/arrow-json/src/reader/struct_array.rs
+++ b/arrow-json/src/reader/struct_array.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::reader::tape::{Tape, TapeElement};
-use crate::reader::{make_decoder, ArrayDecoder};
+use crate::reader::{make_decoder, ArrayDecoder, StructMode};
use arrow_array::builder::BooleanBufferBuilder;
use arrow_buffer::buffer::NullBuffer;
use arrow_data::{ArrayData, ArrayDataBuilder};
@@ -27,6 +27,7 @@ pub struct StructArrayDecoder {
decoders: Vec<Box<dyn ArrayDecoder>>,
strict_mode: bool,
is_nullable: bool,
+ struct_mode: StructMode,
}
impl StructArrayDecoder {
@@ -35,6 +36,7 @@ impl StructArrayDecoder {
coerce_primitive: bool,
strict_mode: bool,
is_nullable: bool,
+ struct_mode: StructMode,
) -> Result<Self, ArrowError> {
let decoders = struct_fields(&data_type)
.iter()
@@ -48,6 +50,7 @@ impl StructArrayDecoder {
coerce_primitive,
strict_mode,
nullable,
+ struct_mode,
)
})
.collect::<Result<Vec<_>, ArrowError>>()?;
@@ -57,6 +60,7 @@ impl StructArrayDecoder {
decoders,
strict_mode,
is_nullable,
+ struct_mode,
})
}
}
@@ -70,43 +74,86 @@ impl ArrayDecoder for StructArrayDecoder {
.is_nullable
.then(|| BooleanBufferBuilder::new(pos.len()));
- for (row, p) in pos.iter().enumerate() {
- let end_idx = match (tape.get(*p), nulls.as_mut()) {
- (TapeElement::StartObject(end_idx), None) => end_idx,
- (TapeElement::StartObject(end_idx), Some(nulls)) => {
- nulls.append(true);
- end_idx
- }
- (TapeElement::Null, Some(nulls)) => {
- nulls.append(false);
- continue;
+ // We avoid having the match on self.struct_mode inside the hot loop
for performance
+ // TODO: Investigate how to extract duplicated logic.
+ match self.struct_mode {
+ StructMode::ObjectOnly => {
+ for (row, p) in pos.iter().enumerate() {
+ let end_idx = match (tape.get(*p), nulls.as_mut()) {
+ (TapeElement::StartObject(end_idx), None) => end_idx,
+ (TapeElement::StartObject(end_idx), Some(nulls)) => {
+ nulls.append(true);
+ end_idx
+ }
+ (TapeElement::Null, Some(nulls)) => {
+ nulls.append(false);
+ continue;
+ }
+ (_, _) => return Err(tape.error(*p, "{")),
+ };
+
+ let mut cur_idx = *p + 1;
+ while cur_idx < end_idx {
+ // Read field name
+ let field_name = match tape.get(cur_idx) {
+ TapeElement::String(s) => tape.get_string(s),
+ _ => return Err(tape.error(cur_idx, "field name")),
+ };
+
+ // Update child pos if match found
+ match fields.iter().position(|x| x.name() ==
field_name) {
+ Some(field_idx) => child_pos[field_idx][row] =
cur_idx + 1,
+ None => {
+ if self.strict_mode {
+ return Err(ArrowError::JsonError(format!(
+ "column '{}' missing from schema",
+ field_name
+ )));
+ }
+ }
+ }
+ // Advance to next field
+ cur_idx = tape.next(cur_idx + 1, "field value")?;
+ }
}
- _ => return Err(tape.error(*p, "{")),
- };
-
- let mut cur_idx = *p + 1;
- while cur_idx < end_idx {
- // Read field name
- let field_name = match tape.get(cur_idx) {
- TapeElement::String(s) => tape.get_string(s),
- _ => return Err(tape.error(cur_idx, "field name")),
- };
-
- // Update child pos if match found
- match fields.iter().position(|x| x.name() == field_name) {
- Some(field_idx) => child_pos[field_idx][row] = cur_idx + 1,
- None => {
- if self.strict_mode {
+ }
+ StructMode::ListOnly => {
+ for (row, p) in pos.iter().enumerate() {
+ let end_idx = match (tape.get(*p), nulls.as_mut()) {
+ (TapeElement::StartList(end_idx), None) => end_idx,
+ (TapeElement::StartList(end_idx), Some(nulls)) => {
+ nulls.append(true);
+ end_idx
+ }
+ (TapeElement::Null, Some(nulls)) => {
+ nulls.append(false);
+ continue;
+ }
+ (_, _) => return Err(tape.error(*p, "[")),
+ };
+
+ let mut cur_idx = *p + 1;
+ let mut entry_idx = 0;
+ while cur_idx < end_idx {
+ if entry_idx >= fields.len() {
return Err(ArrowError::JsonError(format!(
- "column '{}' missing from schema",
- field_name
+ "found extra columns for {} fields",
+ fields.len()
)));
}
+ child_pos[entry_idx][row] = cur_idx;
+ entry_idx += 1;
+ // Advance to next field
+ cur_idx = tape.next(cur_idx, "field value")?;
+ }
+ if entry_idx != fields.len() {
+ return Err(ArrowError::JsonError(format!(
+ "found {} columns for {} fields",
+ entry_idx,
+ fields.len()
+ )));
}
}
-
- // Advance to next field
- cur_idx = tape.next(cur_idx + 1, "field value")?;
}
}
diff --git a/arrow-json/src/writer/encoder.rs b/arrow-json/src/writer/encoder.rs
index ed430fe6a1..0b3c788d55 100644
--- a/arrow-json/src/writer/encoder.rs
+++ b/arrow-json/src/writer/encoder.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::StructMode;
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
@@ -29,6 +30,7 @@ use std::io::Write;
#[derive(Debug, Clone, Default)]
pub struct EncoderOptions {
pub explicit_nulls: bool,
+ pub struct_mode: StructMode,
}
/// A trait to format array values as JSON values
@@ -135,6 +137,7 @@ fn make_encoder_impl<'a>(
let encoder = StructArrayEncoder{
encoders,
explicit_nulls: options.explicit_nulls,
+ struct_mode: options.struct_mode,
};
(Box::new(encoder) as _, array.nulls().cloned())
}
@@ -172,6 +175,7 @@ struct FieldEncoder<'a> {
struct StructArrayEncoder<'a> {
encoders: Vec<FieldEncoder<'a>>,
explicit_nulls: bool,
+ struct_mode: StructMode,
}
/// This API is only stable since 1.70 so can't use it when current MSRV is
lower
@@ -185,11 +189,16 @@ fn is_some_and<T>(opt: Option<T>, f: impl FnOnce(T) ->
bool) -> bool {
impl Encoder for StructArrayEncoder<'_> {
fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
- out.push(b'{');
+ match self.struct_mode {
+ StructMode::ObjectOnly => out.push(b'{'),
+ StructMode::ListOnly => out.push(b'['),
+ }
let mut is_first = true;
+ // Nulls can only be dropped in explicit mode
+ let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) &&
!self.explicit_nulls;
for field_encoder in &mut self.encoders {
let is_null = is_some_and(field_encoder.nulls.as_ref(), |n|
n.is_null(idx));
- if is_null && !self.explicit_nulls {
+ if drop_nulls && is_null {
continue;
}
@@ -198,15 +207,20 @@ impl Encoder for StructArrayEncoder<'_> {
}
is_first = false;
- encode_string(field_encoder.field.name(), out);
- out.push(b':');
+ if self.struct_mode == StructMode::ObjectOnly {
+ encode_string(field_encoder.field.name(), out);
+ out.push(b':');
+ }
match is_null {
true => out.extend_from_slice(b"null"),
false => field_encoder.encoder.encode(idx, out),
}
}
- out.push(b'}');
+ match self.struct_mode {
+ StructMode::ObjectOnly => out.push(b'}'),
+ StructMode::ListOnly => out.push(b']'),
+ }
}
}
diff --git a/arrow-json/src/writer/mod.rs b/arrow-json/src/writer/mod.rs
index ee6d83a0a1..5d3e558480 100644
--- a/arrow-json/src/writer/mod.rs
+++ b/arrow-json/src/writer/mod.rs
@@ -108,6 +108,7 @@ mod encoder;
use std::{fmt::Debug, io::Write};
+use crate::StructMode;
use arrow_array::*;
use arrow_schema::*;
@@ -247,12 +248,28 @@ impl WriterBuilder {
/// {"foo":null,"bar":null}
/// ```
///
- /// Default is to skip nulls (set to `false`).
+ /// Default is to skip nulls (set to `false`). If `struct_mode ==
ListOnly`,
+ /// nulls will be written explicitly regardless of this setting.
pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
self.0.explicit_nulls = explicit_nulls;
self
}
+ /// Returns if this writer is configured to write structs as JSON Objects
or Arrays.
+ pub fn struct_mode(&self) -> StructMode {
+ self.0.struct_mode
+ }
+
+ /// Set the [`StructMode`] for the writer, which determines whether structs
+ /// are encoded to JSON as objects or lists. For more details refer to the
+ /// enum documentation. Default is to use `ObjectOnly`. If this is set to
+ /// `ListOnly`, nulls will be written explicitly regardless of the
+ /// `explicit_nulls` setting.
+ pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
+ self.0.struct_mode = struct_mode;
+ self
+ }
+
/// Create a new `Writer` with specified `JsonFormat` and builder options.
pub fn build<W, F>(self, writer: W) -> Writer<W, F>
where
@@ -1953,4 +1970,70 @@ mod tests {
"#,
);
}
+
+ #[test]
+ fn write_structs_as_list() {
+ let schema = Schema::new(vec![
+ Field::new(
+ "c1",
+ DataType::Struct(Fields::from(vec![
+ Field::new("c11", DataType::Int32, true),
+ Field::new(
+ "c12",
+ DataType::Struct(vec![Field::new("c121",
DataType::Utf8, false)].into()),
+ false,
+ ),
+ ])),
+ false,
+ ),
+ Field::new("c2", DataType::Utf8, false),
+ ]);
+
+ let c1 = StructArray::from(vec![
+ (
+ Arc::new(Field::new("c11", DataType::Int32, true)),
+ Arc::new(Int32Array::from(vec![Some(1), None, Some(5)])) as
ArrayRef,
+ ),
+ (
+ Arc::new(Field::new(
+ "c12",
+ DataType::Struct(vec![Field::new("c121", DataType::Utf8,
false)].into()),
+ false,
+ )),
+ Arc::new(StructArray::from(vec![(
+ Arc::new(Field::new("c121", DataType::Utf8, false)),
+ Arc::new(StringArray::from(vec![Some("e"), Some("f"),
Some("g")])) as ArrayRef,
+ )])) as ArrayRef,
+ ),
+ ]);
+ let c2 = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
+
+ let batch =
+ RecordBatch::try_new(Arc::new(schema), vec![Arc::new(c1),
Arc::new(c2)]).unwrap();
+
+ let expected = r#"[[1,["e"]],"a"]
+[[null,["f"]],"b"]
+[[5,["g"]],"c"]
+"#;
+
+ let mut buf = Vec::new();
+ {
+ let builder = WriterBuilder::new()
+ .with_explicit_nulls(true)
+ .with_struct_mode(StructMode::ListOnly);
+ let mut writer = builder.build::<_, LineDelimited>(&mut buf);
+ writer.write_batches(&[&batch]).unwrap();
+ }
+ assert_json_eq(&buf, expected);
+
+ let mut buf = Vec::new();
+ {
+ let builder = WriterBuilder::new()
+ .with_explicit_nulls(false)
+ .with_struct_mode(StructMode::ListOnly);
+ let mut writer = builder.build::<_, LineDelimited>(&mut buf);
+ writer.write_batches(&[&batch]).unwrap();
+ }
+ assert_json_eq(&buf, expected);
+ }
}