This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 43d984e0b3 Add a JSON reader option to ignore type conflicts (#7276)
43d984e0b3 is described below
commit 43d984e0b3584841991e7169cf62a35b7025b691
Author: Ryan Johnson <[email protected]>
AuthorDate: Tue Apr 7 07:37:59 2026 -0600
Add a JSON reader option to ignore type conflicts (#7276)
# Which issue does this PR close?
- Closes https://github.com/apache/arrow-rs/issues/7230
# Rationale for this change
JSON data is notoriously non-homogenous, but the JSON parser today is
super strict -- it requires a concrete schema and parsing fails if any
field of any row encounters a type conflict. In such cases, it can be
preferable for incompatible fields to parse as NULL instead of producing
a hard error.
# What changes are included in this PR?
Adds a new method
`arrow_json::reader::ReaderBuilder::with_ignore_type_conflicts`, which
can override the default behavior of throwing on type conflict, to
return NULL values instead.
Plumb that flag through to all ten decoders so they honor it: Null,
Boolean, Primitive, Decimal, Timestamp, String, StringView, List, Map,
Struct.
Add both positive and negative unit tests for each decoder type, to
ensure the plumbing worked.
# Are there any user-facing changes?
New API method, see above.
---
arrow-json/src/reader/boolean_array.rs | 14 +-
arrow-json/src/reader/decimal_array.rs | 54 ++--
arrow-json/src/reader/list_array.rs | 6 +
arrow-json/src/reader/map_array.rs | 6 +
arrow-json/src/reader/mod.rs | 466 +++++++++++++++++++++++++++--
arrow-json/src/reader/null_array.rs | 21 +-
arrow-json/src/reader/primitive_array.rs | 56 ++--
arrow-json/src/reader/string_array.rs | 10 +-
arrow-json/src/reader/string_view_array.rs | 14 +-
arrow-json/src/reader/struct_array.rs | 10 +
arrow-json/src/reader/timestamp_array.rs | 49 +--
11 files changed, 587 insertions(+), 119 deletions(-)
diff --git a/arrow-json/src/reader/boolean_array.rs
b/arrow-json/src/reader/boolean_array.rs
index 17c0586dfa..2f18cff318 100644
--- a/arrow-json/src/reader/boolean_array.rs
+++ b/arrow-json/src/reader/boolean_array.rs
@@ -21,11 +21,20 @@ use arrow_array::ArrayRef;
use arrow_array::builder::BooleanBuilder;
use arrow_schema::ArrowError;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
#[derive(Default)]
-pub struct BooleanArrayDecoder {}
+pub struct BooleanArrayDecoder {
+ ignore_type_conflicts: bool,
+}
+impl BooleanArrayDecoder {
+ pub fn new(ctx: &DecoderContext) -> Self {
+ Self {
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
+ }
+ }
+}
impl ArrayDecoder for BooleanArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef,
ArrowError> {
@@ -35,6 +44,7 @@ impl ArrayDecoder for BooleanArrayDecoder {
TapeElement::Null => builder.append_null(),
TapeElement::True => builder.append_value(true),
TapeElement::False => builder.append_value(false),
+ _ if self.ignore_type_conflicts => builder.append_null(),
_ => return Err(tape.error(*p, "boolean")),
}
}
diff --git a/arrow-json/src/reader/decimal_array.rs
b/arrow-json/src/reader/decimal_array.rs
index c9936e04a4..4eaab4847d 100644
--- a/arrow-json/src/reader/decimal_array.rs
+++ b/arrow-json/src/reader/decimal_array.rs
@@ -24,21 +24,23 @@ use arrow_array::types::DecimalType;
use arrow_cast::parse::parse_decimal;
use arrow_schema::ArrowError;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
pub struct DecimalArrayDecoder<D: DecimalType> {
precision: u8,
scale: i8,
+ ignore_type_conflicts: bool,
// Invariant and Send
phantom: PhantomData<fn(D) -> D>,
}
impl<D: DecimalType> DecimalArrayDecoder<D> {
- pub fn new(precision: u8, scale: i8) -> Self {
+ pub fn new(ctx: &DecoderContext, precision: u8, scale: i8) -> Self {
Self {
precision,
scale,
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
phantom: PhantomData,
}
}
@@ -51,46 +53,48 @@ where
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef,
ArrowError> {
let mut builder = PrimitiveBuilder::<D>::with_capacity(pos.len());
+ #[allow(unused)] // initial value overwritten without ever being read
+ let mut anchor = String::default();
for p in pos {
- match tape.get(*p) {
- TapeElement::Null => builder.append_null(),
- TapeElement::String(idx) => {
- let s = tape.get_string(idx);
- let value = parse_decimal::<D>(s, self.precision,
self.scale)?;
- builder.append_value(value)
- }
- TapeElement::Number(idx) => {
- let s = tape.get_string(idx);
- let value = parse_decimal::<D>(s, self.precision,
self.scale)?;
- builder.append_value(value)
+ let value = match tape.get(*p) {
+ TapeElement::Null => {
+ builder.append_null();
+ continue;
}
+ TapeElement::String(idx) | TapeElement::Number(idx) =>
tape.get_string(idx),
TapeElement::I64(high) => match tape.get(*p + 1) {
TapeElement::I32(low) => {
- let val = (((high as i64) << 32) | (low as u32) as
i64).to_string();
- let value = parse_decimal::<D>(&val, self.precision,
self.scale)?;
- builder.append_value(value)
+ anchor = (((high as i64) << 32) | (low as u32) as
i64).to_string();
+ anchor.as_str()
}
_ => unreachable!(),
},
TapeElement::I32(val) => {
- let s = val.to_string();
- let value = parse_decimal::<D>(&s, self.precision,
self.scale)?;
- builder.append_value(value)
+ anchor = val.to_string();
+ anchor.as_str()
}
TapeElement::F64(high) => match tape.get(*p + 1) {
TapeElement::F32(low) => {
- let val = f64::from_bits(((high as u64) << 32) | low
as u64).to_string();
- let value = parse_decimal::<D>(&val, self.precision,
self.scale)?;
- builder.append_value(value)
+ anchor = f64::from_bits(((high as u64) << 32) | low as
u64).to_string();
+ anchor.as_str()
}
_ => unreachable!(),
},
TapeElement::F32(val) => {
- let s = f32::from_bits(val).to_string();
- let value = parse_decimal::<D>(&s, self.precision,
self.scale)?;
- builder.append_value(value)
+ anchor = f32::from_bits(val).to_string();
+ anchor.as_str()
+ }
+ _ if self.ignore_type_conflicts => {
+ builder.append_null();
+ continue;
}
_ => return Err(tape.error(*p, "decimal")),
+ };
+
+ match parse_decimal::<D>(value, self.precision, self.scale) {
+ Ok(value) => builder.append_value(value),
+ Err(_) if self.ignore_type_conflicts => builder.append_null(),
+ Err(e) => return Err(e),
}
}
diff --git a/arrow-json/src/reader/list_array.rs
b/arrow-json/src/reader/list_array.rs
index b11124576d..113e628541 100644
--- a/arrow-json/src/reader/list_array.rs
+++ b/arrow-json/src/reader/list_array.rs
@@ -35,6 +35,7 @@ pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
field: FieldRef,
decoder: Box<dyn ArrayDecoder>,
phantom: PhantomData<O>,
+ ignore_type_conflicts: bool,
is_nullable: bool,
}
@@ -57,6 +58,7 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool>
ListLikeArrayDecoder<O, IS_VIEW> {
field: field.clone(),
decoder,
phantom: Default::default(),
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
is_nullable,
})
}
@@ -83,6 +85,10 @@ impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder
for ListLikeArrayDeco
nulls.append(false);
*p + 1
}
+ (_, Some(nulls)) if self.ignore_type_conflicts => {
+ nulls.append(false);
+ *p + 1
+ }
_ => return Err(tape.error(*p, "[")),
};
diff --git a/arrow-json/src/reader/map_array.rs
b/arrow-json/src/reader/map_array.rs
index 4ec855a666..87cd84cc3e 100644
--- a/arrow-json/src/reader/map_array.rs
+++ b/arrow-json/src/reader/map_array.rs
@@ -32,6 +32,7 @@ pub struct MapArrayDecoder {
ordered: bool,
keys: Box<dyn ArrayDecoder>,
values: Box<dyn ArrayDecoder>,
+ ignore_type_conflicts: bool,
is_nullable: bool,
}
@@ -75,6 +76,7 @@ impl MapArrayDecoder {
ordered,
keys,
values,
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
is_nullable,
})
}
@@ -103,6 +105,10 @@ impl ArrayDecoder for MapArrayDecoder {
nulls.append(false);
p + 1
}
+ (_, Some(nulls)) if self.ignore_type_conflicts => {
+ nulls.append(false);
+ p + 1
+ }
_ => return Err(tape.error(p, "{")),
};
diff --git a/arrow-json/src/reader/mod.rs b/arrow-json/src/reader/mod.rs
index 497ec4c3f3..62c13c70ed 100644
--- a/arrow-json/src/reader/mod.rs
+++ b/arrow-json/src/reader/mod.rs
@@ -187,6 +187,7 @@ pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
+ ignore_type_conflicts: bool,
is_field: bool,
struct_mode: StructMode,
@@ -207,6 +208,7 @@ impl ReaderBuilder {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
+ ignore_type_conflicts: false,
is_field: false,
struct_mode: Default::default(),
schema,
@@ -248,6 +250,7 @@ impl ReaderBuilder {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
+ ignore_type_conflicts: false,
is_field: true,
struct_mode: Default::default(),
schema: Arc::new(Schema::new([field.into()])),
@@ -290,6 +293,25 @@ impl ReaderBuilder {
}
}
+ /// Sets whether the decoder should produce NULL instead of returning an
error if it encounters
+ /// value that can not be parsed into the specified column type.
+ ///
+ /// For example, if the type is declared to be a nullable array of
`DataType::Int32` but the
+ /// reader encounters a string value `"foo"` and the value
`ignore_type_conflicts` is:
+ ///
+ /// * `false` (the default): The reader will return an error.
+ ///
+ /// * `true`: The reader will fill in NULL value for that array element.
+ ///
+ /// NOTE: An inferred NULL due to a type conflict will still produce
parsing errors for
+ /// non-nullable fields, the same as any other NULL or missing value.
+ pub fn with_ignore_type_conflicts(self, ignore_type_conflicts: bool) ->
Self {
+ Self {
+ ignore_type_conflicts,
+ ..self
+ }
+ }
+
/// Create a [`Reader`] with the provided [`BufRead`]
pub fn build<R: BufRead>(self, reader: R) -> Result<Reader<R>, ArrowError>
{
Ok(Reader {
@@ -313,6 +335,7 @@ impl ReaderBuilder {
coerce_primitive: self.coerce_primitive,
strict_mode: self.strict_mode,
struct_mode: self.struct_mode,
+ ignore_type_conflicts: self.ignore_type_conflicts,
};
let decoder = ctx.make_decoder(data_type.as_ref(), nullable)?;
@@ -695,6 +718,8 @@ pub struct DecoderContext {
strict_mode: bool,
/// How to decode struct fields
struct_mode: StructMode,
+ /// Whether to treat columns with incompatible types as missing (i.e. NULL)
+ ignore_type_conflicts: bool,
}
impl DecoderContext {
@@ -713,6 +738,11 @@ impl DecoderContext {
self.struct_mode
}
+ /// Returns whether to treat columns with incompatible types as missing
(i.e. NULL)
+ pub fn ignore_type_conflicts(&self) -> bool {
+ self.ignore_type_conflicts
+ }
+
/// Create a decoder for a type.
///
/// This is the standard way to create child decoders from within a decoder
@@ -726,51 +756,62 @@ impl DecoderContext {
}
}
-macro_rules! primitive_decoder {
- ($t:ty, $data_type:expr) => {
- Ok(Box::new(PrimitiveArrayDecoder::<$t>::new($data_type)))
- };
-}
-
fn make_decoder(
ctx: &DecoderContext,
data_type: &DataType,
is_nullable: bool,
) -> Result<Box<dyn ArrayDecoder>, ArrowError> {
- let coerce_primitive = ctx.coerce_primitive();
+ macro_rules! primitive_decoder {
+ ($t:ty, $data_type:expr) => {
+ Ok(Box::new(PrimitiveArrayDecoder::<$t>::new(ctx, $data_type)))
+ };
+ }
+ macro_rules! timestamp_decoder {
+ ($t:ty, $data_type:expr, $tz:expr) => {{
+ Ok(Box::new(TimestampArrayDecoder::<$t, _>::new(
+ ctx, $data_type, $tz,
+ )))
+ }};
+ }
+ macro_rules! decimal_decoder {
+ ($t:ty, $p:expr, $s:expr) => {
+ Ok(Box::new(DecimalArrayDecoder::<$t>::new(ctx, $p, $s)))
+ };
+ }
+
downcast_integer! {
*data_type => (primitive_decoder, data_type),
- DataType::Null => Ok(Box::<NullArrayDecoder>::default()),
+ DataType::Null => Ok(Box::new(NullArrayDecoder::new(ctx))),
DataType::Float16 => primitive_decoder!(Float16Type, data_type),
DataType::Float32 => primitive_decoder!(Float32Type, data_type),
DataType::Float64 => primitive_decoder!(Float64Type, data_type),
DataType::Timestamp(TimeUnit::Second, None) => {
- Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType,
_>::new(data_type, Utc)))
+ timestamp_decoder!(TimestampSecondType, data_type, Utc)
},
DataType::Timestamp(TimeUnit::Millisecond, None) => {
- Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType,
_>::new(data_type, Utc)))
+ timestamp_decoder!(TimestampMillisecondType, data_type, Utc)
},
DataType::Timestamp(TimeUnit::Microsecond, None) => {
- Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType,
_>::new(data_type, Utc)))
+ timestamp_decoder!(TimestampMicrosecondType, data_type, Utc)
},
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
- Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType,
_>::new(data_type, Utc)))
+ timestamp_decoder!(TimestampNanosecondType, data_type, Utc)
},
DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
- Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType,
_>::new(data_type, tz)))
+ timestamp_decoder!(TimestampSecondType, data_type, tz)
},
DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
- Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType,
_>::new(data_type, tz)))
+ timestamp_decoder!(TimestampMillisecondType, data_type, tz)
},
DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
- Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType,
_>::new(data_type, tz)))
+ timestamp_decoder!(TimestampMicrosecondType, data_type, tz)
},
DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
- Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType,
_>::new(data_type, tz)))
+ timestamp_decoder!(TimestampNanosecondType, data_type, tz)
},
DataType::Date32 => primitive_decoder!(Date32Type, data_type),
DataType::Date64 => primitive_decoder!(Date64Type, data_type),
@@ -782,14 +823,14 @@ fn make_decoder(
DataType::Duration(TimeUnit::Microsecond) =>
primitive_decoder!(DurationMicrosecondType, data_type),
DataType::Duration(TimeUnit::Millisecond) =>
primitive_decoder!(DurationMillisecondType, data_type),
DataType::Duration(TimeUnit::Second) =>
primitive_decoder!(DurationSecondType, data_type),
- DataType::Decimal32(p, s) =>
Ok(Box::new(DecimalArrayDecoder::<Decimal32Type>::new(p, s))),
- DataType::Decimal64(p, s) =>
Ok(Box::new(DecimalArrayDecoder::<Decimal64Type>::new(p, s))),
- DataType::Decimal128(p, s) =>
Ok(Box::new(DecimalArrayDecoder::<Decimal128Type>::new(p, s))),
- DataType::Decimal256(p, s) =>
Ok(Box::new(DecimalArrayDecoder::<Decimal256Type>::new(p, s))),
- DataType::Boolean => Ok(Box::<BooleanArrayDecoder>::default()),
- DataType::Utf8 =>
Ok(Box::new(StringArrayDecoder::<i32>::new(coerce_primitive))),
- DataType::Utf8View =>
Ok(Box::new(StringViewArrayDecoder::new(coerce_primitive))),
- DataType::LargeUtf8 =>
Ok(Box::new(StringArrayDecoder::<i64>::new(coerce_primitive))),
+ DataType::Decimal32(p, s) => decimal_decoder!(Decimal32Type, p, s),
+ DataType::Decimal64(p, s) => decimal_decoder!(Decimal64Type, p, s),
+ DataType::Decimal128(p, s) => decimal_decoder!(Decimal128Type, p, s),
+ DataType::Decimal256(p, s) => decimal_decoder!(Decimal256Type, p, s),
+ DataType::Boolean => Ok(Box::new(BooleanArrayDecoder::new(ctx))),
+ DataType::Utf8 => Ok(Box::new(StringArrayDecoder::<i32>::new(ctx))),
+ DataType::Utf8View => Ok(Box::new(StringViewArrayDecoder::new(ctx))),
+ DataType::LargeUtf8 =>
Ok(Box::new(StringArrayDecoder::<i64>::new(ctx))),
DataType::List(_) => Ok(Box::new(ListArrayDecoder::<i32>::new(ctx,
data_type, is_nullable)?)),
DataType::LargeList(_) =>
Ok(Box::new(ListArrayDecoder::<i64>::new(ctx, data_type, is_nullable)?)),
DataType::ListView(_) =>
Ok(Box::new(ListViewArrayDecoder::<i32>::new(ctx, data_type, is_nullable)?)),
@@ -812,19 +853,18 @@ fn make_decoder(
#[cfg(test)]
mod tests {
- use serde_json::json;
- use std::fs::File;
- use std::io::{BufReader, Cursor, Seek};
-
use arrow_array::cast::AsArray;
use arrow_array::{
- Array, BooleanArray, Float64Array, GenericListViewArray, ListArray,
OffsetSizeTrait,
- StringArray, StringViewArray, StructArray, make_array,
+ Array, BooleanArray, Float64Array, GenericListViewArray, Int32Array,
ListArray, MapArray,
+ NullArray, OffsetSizeTrait, StringArray, StringViewArray, StructArray,
make_array,
};
- use arrow_buffer::{ArrowNativeType, Buffer};
+ use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer};
use arrow_cast::display::{ArrayFormatter, FormatOptions};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{Field, Fields};
+ use serde_json::json;
+ use std::fs::File;
+ use std::io::{BufReader, Cursor, Seek};
use super::*;
@@ -2944,6 +2984,370 @@ mod tests {
);
}
+ #[test]
+ fn test_type_conflict_nulls() {
+ let schema = Schema::new(vec![
+ Field::new("null", DataType::Null, true),
+ Field::new("bool", DataType::Boolean, true),
+ Field::new("primitive", DataType::Int32, true),
+ Field::new("numeric", DataType::Decimal128(10, 3), true),
+ Field::new("string", DataType::Utf8, true),
+ Field::new("string_view", DataType::Utf8View, true),
+ Field::new(
+ "timestamp",
+ DataType::Timestamp(TimeUnit::Second, None),
+ true,
+ ),
+ Field::new(
+ "array",
+ DataType::List(Arc::new(Field::new("item", DataType::Int32,
true))),
+ true,
+ ),
+ Field::new(
+ "map",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false, // not nullable
+ )),
+ false, // not sorted
+ ),
+ true, // nullable
+ ),
+ Field::new(
+ "struct",
+ DataType::Struct(Fields::from(vec![Field::new("a",
DataType::Int32, true)])),
+ true,
+ ),
+ ]);
+
+ // A compatible value for each schema field above, in schema order
+ let json_values = vec![
+ json!(null),
+ json!(true),
+ json!(42),
+ json!(1.234),
+ json!("hi"),
+ json!("ho"),
+ json!("1970-01-01T00:00:00+02:00"),
+ json!([1, "ho", 3]),
+ json!({"k": "value"}),
+ json!({"a": 1}),
+ ];
+
+ // Create a set of JSON rows that rotates each value past every field
+ let json: Vec<_> = (0..json_values.len())
+ .map(|i| {
+ let pairs = json_values[i..]
+ .iter()
+ .chain(json_values[..i].iter())
+ .zip(&schema.fields)
+ .map(|(v, f)| (f.name().to_string(), v.clone()))
+ .collect();
+ serde_json::Value::Object(pairs)
+ })
+ .collect();
+ let mut decoder = ReaderBuilder::new(Arc::new(schema))
+ .with_ignore_type_conflicts(true)
+ .with_coerce_primitive(true)
+ .build_decoder()
+ .unwrap();
+ decoder.serialize(&json).unwrap();
+ let batch = decoder.flush().unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 10);
+ assert_eq!(batch.num_columns(), 10);
+
+ // NOTE: NullArray doesn't materialize any values (they're all NULL by
definition)
+ let _ = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<NullArray>()
+ .unwrap();
+
+ assert!(
+ batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap()
+ .iter()
+ .eq([
+ Some(true),
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None
+ ])
+ );
+
+ assert!(batch.column(2).as_primitive::<Int32Type>().iter().eq([
+ Some(42),
+ Some(1),
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None
+ ]));
+
+ assert!(batch.column(3).as_primitive::<Decimal128Type>().iter().eq([
+ Some(1234),
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ None,
+ Some(42000)
+ ]));
+
+ assert!(
+ batch
+ .column(4)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .iter()
+ .eq([
+ Some("hi"),
+ Some("ho"),
+ Some("1970-01-01T00:00:00+02:00"),
+ None,
+ None,
+ None,
+ None,
+ Some("true"),
+ Some("42"),
+ Some("1.234"),
+ ])
+ );
+
+ assert!(
+ batch
+ .column(5)
+ .as_any()
+ .downcast_ref::<StringViewArray>()
+ .unwrap()
+ .iter()
+ .eq([
+ Some("ho"),
+ Some("1970-01-01T00:00:00+02:00"),
+ None,
+ None,
+ None,
+ None,
+ Some("true"),
+ Some("42"),
+ Some("1.234"),
+ Some("hi"),
+ ])
+ );
+
+ assert!(
+ batch
+ .column(6)
+ .as_primitive::<TimestampSecondType>()
+ .iter()
+ .eq([
+ Some(-7200),
+ None,
+ None,
+ None,
+ None,
+ None,
+ Some(42),
+ None,
+ None,
+ None,
+ ])
+ );
+
+ let arrays = batch
+ .column(7)
+ .as_any()
+ .downcast_ref::<ListArray>()
+ .unwrap();
+ assert_eq!(
+ arrays.nulls(),
+ Some(&NullBuffer::from(
+ &[
+ true, false, false, false, false, false, false, false,
false, false
+ ][..]
+ ))
+ );
+ assert_eq!(arrays.offsets()[1], 3);
+ let array_values = arrays
+ .values()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert!(array_values.iter().eq([Some(1), None, Some(3)]));
+
+ let maps =
batch.column(8).as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(
+ maps.nulls(),
+ Some(&NullBuffer::from(
+ // Both map and struct can parse
+ &[
+ true, true, false, false, false, false, false, false,
false, false
+ ][..]
+ ))
+ );
+ let map_keys =
maps.keys().as_any().downcast_ref::<StringArray>().unwrap();
+ assert!(map_keys.iter().eq([Some("k"), Some("a")]));
+ let map_values = maps
+ .values()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert!(map_values.iter().eq([Some("value"), Some("1")]));
+
+ let structs = batch
+ .column(9)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+ assert_eq!(
+ structs.nulls(),
+ Some(&NullBuffer::from(
+ // Both map and struct can parse
+ &[
+ true, false, false, false, false, false, false, false,
false, true
+ ][..]
+ ))
+ );
+ let struct_fields = structs
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert!(struct_fields.slice(0, 2).iter().eq([Some(1), None]));
+ }
+
+ #[test]
+ fn test_type_conflict_non_nullable() {
+ let fields = [
+ Field::new("bool", DataType::Boolean, false),
+ Field::new("primitive", DataType::Int32, false),
+ Field::new("numeric", DataType::Decimal128(10, 3), false),
+ Field::new("string", DataType::Utf8, false),
+ Field::new("string_view", DataType::Utf8View, false),
+ Field::new(
+ "timestamp",
+ DataType::Timestamp(TimeUnit::Second, None),
+ false,
+ ),
+ Field::new(
+ "array",
+ DataType::List(Arc::new(Field::new("item", DataType::Int32,
true))),
+ false,
+ ),
+ Field::new(
+ "map",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false, // not nullable
+ )),
+ false, // not sorted
+ ),
+ false, // not nullable
+ ),
+ Field::new(
+ "struct",
+ DataType::Struct(Fields::from(vec![Field::new("a",
DataType::Int32, true)])),
+ false,
+ ),
+ ];
+
+ // Every field above will have a type conflict with at least one of
these values
+ let json_values = vec![json!(true), json!({"a": 1})];
+
+ for field in fields {
+ let mut decoder = ReaderBuilder::new_with_field(field)
+ .with_ignore_type_conflicts(true)
+ .build_decoder()
+ .unwrap();
+ decoder.serialize(&json_values).unwrap();
+ decoder
+ .flush()
+ .expect_err("type conflict on non-nullable type");
+ }
+ }
+
+ #[test]
+ fn test_ignore_type_conflicts_disabled() {
+ let fields = [
+ Field::new("null", DataType::Null, true),
+ Field::new("bool", DataType::Boolean, true),
+ Field::new("primitive", DataType::Int32, true),
+ Field::new("numeric", DataType::Decimal128(10, 3), true),
+ Field::new("string", DataType::Utf8, true),
+ Field::new("string_view", DataType::Utf8View, true),
+ Field::new(
+ "timestamp",
+ DataType::Timestamp(TimeUnit::Second, None),
+ true,
+ ),
+ Field::new(
+ "array",
+ DataType::List(Arc::new(Field::new("item", DataType::Int32,
true))),
+ true,
+ ),
+ Field::new(
+ "map",
+ DataType::Map(
+ Arc::new(Field::new(
+ "entries",
+ DataType::Struct(Fields::from(vec![
+ Field::new("keys", DataType::Utf8, false),
+ Field::new("values", DataType::Utf8, true),
+ ])),
+ false, // not nullable
+ )),
+ false, // not sorted
+ ),
+ true, // not nullable
+ ),
+ Field::new(
+ "struct",
+ DataType::Struct(Fields::from(vec![Field::new("a",
DataType::Int32, true)])),
+ true,
+ ),
+ ];
+
+ // Every field above will have a type conflict with at least one of
these values
+ let json_values = vec![json!(true), json!({"a": 1})];
+
+ for field in fields {
+ let mut decoder = ReaderBuilder::new_with_field(field)
+ .build_decoder()
+ .unwrap();
+ decoder.serialize(&json_values).unwrap();
+ decoder
+ .flush()
+ .expect_err("type conflict on non-nullable type");
+ }
+ }
+
#[test]
fn test_read_run_end_encoded() {
let buf = r#"
diff --git a/arrow-json/src/reader/null_array.rs
b/arrow-json/src/reader/null_array.rs
index 9c6ac3d288..6a9660cfa3 100644
--- a/arrow-json/src/reader/null_array.rs
+++ b/arrow-json/src/reader/null_array.rs
@@ -20,17 +20,28 @@ use std::sync::Arc;
use arrow_array::{ArrayRef, NullArray};
use arrow_schema::ArrowError;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
#[derive(Default)]
-pub struct NullArrayDecoder {}
+pub struct NullArrayDecoder {
+ ignore_type_conflicts: bool,
+}
+impl NullArrayDecoder {
+ pub fn new(ctx: &DecoderContext) -> Self {
+ Self {
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
+ }
+ }
+}
impl ArrayDecoder for NullArrayDecoder {
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef,
ArrowError> {
- for p in pos {
- if !matches!(tape.get(*p), TapeElement::Null) {
- return Err(tape.error(*p, "null"));
+ if !self.ignore_type_conflicts {
+ for p in pos {
+ if !matches!(tape.get(*p), TapeElement::Null) {
+ return Err(tape.error(*p, "null"));
+ }
}
}
Ok(Arc::new(NullArray::new(pos.len())))
diff --git a/arrow-json/src/reader/primitive_array.rs
b/arrow-json/src/reader/primitive_array.rs
index 559b82ea83..b086954297 100644
--- a/arrow-json/src/reader/primitive_array.rs
+++ b/arrow-json/src/reader/primitive_array.rs
@@ -25,8 +25,8 @@ use arrow_schema::{ArrowError, DataType};
use half::f16;
use num_traits::NumCast;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
/// A trait for JSON-specific primitive parsing logic
///
@@ -75,14 +75,16 @@ impl ParseJsonNumber for f64 {
pub struct PrimitiveArrayDecoder<P: ArrowPrimitiveType> {
data_type: DataType,
+ ignore_type_conflicts: bool,
// Invariant and Send
phantom: PhantomData<fn(P) -> P>,
}
impl<P: ArrowPrimitiveType> PrimitiveArrayDecoder<P> {
- pub fn new(data_type: &DataType) -> Self {
+ pub fn new(ctx: &DecoderContext, data_type: &DataType) -> Self {
Self {
data_type: data_type.clone(),
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
phantom: Default::default(),
}
}
@@ -99,58 +101,56 @@ where
let d = &self.data_type;
for p in pos {
- match tape.get(*p) {
- TapeElement::Null => builder.append_null(),
+ let value = match tape.get(*p) {
+ TapeElement::Null => {
+ builder.append_null();
+ continue;
+ }
TapeElement::String(idx) => {
let s = tape.get_string(idx);
- let value = P::parse(s).ok_or_else(|| {
+ P::parse(s).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse \"{s}\"
as {d}",))
- })?;
-
- builder.append_value(value)
+ })
}
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
- let value =
ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
+ ParseJsonNumber::parse(s.as_bytes()).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {s} as
{d}",))
- })?;
-
- builder.append_value(value)
+ })
}
TapeElement::F32(v) => {
let v = f32::from_bits(v);
- let value = NumCast::from(v).ok_or_else(|| {
+ NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v} as
{d}",))
- })?;
- builder.append_value(value)
- }
- TapeElement::I32(v) => {
- let value = NumCast::from(v).ok_or_else(|| {
- ArrowError::JsonError(format!("failed to parse {v} as
{d}",))
- })?;
- builder.append_value(value)
+ })
}
+ TapeElement::I32(v) => NumCast::from(v)
+ .ok_or_else(|| ArrowError::JsonError(format!("failed to
parse {v} as {d}",))),
TapeElement::F64(high) => match tape.get(p + 1) {
TapeElement::F32(low) => {
let v = f64::from_bits(((high as u64) << 32) | low as
u64);
- let value = NumCast::from(v).ok_or_else(|| {
+ NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v}
as {d}",))
- })?;
- builder.append_value(value)
+ })
}
_ => unreachable!(),
},
TapeElement::I64(high) => match tape.get(p + 1) {
TapeElement::I32(low) => {
let v = ((high as i64) << 32) | (low as u32) as i64;
- let value = NumCast::from(v).ok_or_else(|| {
+ NumCast::from(v).ok_or_else(|| {
ArrowError::JsonError(format!("failed to parse {v}
as {d}",))
- })?;
- builder.append_value(value)
+ })
}
_ => unreachable!(),
},
- _ => return Err(tape.error(*p, "primitive")),
+ _ => Err(tape.error(*p, "primitive")),
+ };
+
+ match value {
+ Ok(value) => builder.append_value(value),
+ Err(_) if self.ignore_type_conflicts => builder.append_null(),
+ Err(e) => return Err(e),
}
}
diff --git a/arrow-json/src/reader/string_array.rs
b/arrow-json/src/reader/string_array.rs
index 6cdfa06013..6b6abc21ed 100644
--- a/arrow-json/src/reader/string_array.rs
+++ b/arrow-json/src/reader/string_array.rs
@@ -24,21 +24,23 @@ use arrow_schema::ArrowError;
use itoa;
use ryu;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
const TRUE: &str = "true";
const FALSE: &str = "false";
pub struct StringArrayDecoder<O: OffsetSizeTrait> {
coerce_primitive: bool,
+ ignore_type_conflicts: bool,
phantom: PhantomData<O>,
}
impl<O: OffsetSizeTrait> StringArrayDecoder<O> {
- pub fn new(coerce_primitive: bool) -> Self {
+ pub fn new(ctx: &DecoderContext) -> Self {
Self {
- coerce_primitive,
+ coerce_primitive: ctx.coerce_primitive(),
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
phantom: Default::default(),
}
}
@@ -73,6 +75,7 @@ impl<O: OffsetSizeTrait> ArrayDecoder for
StringArrayDecoder<O> {
// An arbitrary estimate
data_capacity += 10;
}
+ _ if self.ignore_type_conflicts => {}
_ => {
return Err(tape.error(*p, "string"));
}
@@ -126,6 +129,7 @@ impl<O: OffsetSizeTrait> ArrayDecoder for
StringArrayDecoder<O> {
}
_ => unreachable!(),
},
+ _ if self.ignore_type_conflicts => builder.append_null(),
_ => unreachable!(),
}
}
diff --git a/arrow-json/src/reader/string_view_array.rs
b/arrow-json/src/reader/string_view_array.rs
index 5364317dfd..1fc4627d3a 100644
--- a/arrow-json/src/reader/string_view_array.rs
+++ b/arrow-json/src/reader/string_view_array.rs
@@ -23,19 +23,23 @@ use arrow_array::builder::GenericByteViewBuilder;
use arrow_array::types::StringViewType;
use arrow_schema::ArrowError;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
const TRUE: &str = "true";
const FALSE: &str = "false";
pub struct StringViewArrayDecoder {
coerce_primitive: bool,
+ ignore_type_conflicts: bool,
}
impl StringViewArrayDecoder {
- pub fn new(coerce_primitive: bool) -> Self {
- Self { coerce_primitive }
+ pub fn new(ctx: &DecoderContext) -> Self {
+ Self {
+ coerce_primitive: ctx.coerce_primitive(),
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
+ }
}
}
@@ -100,6 +104,7 @@ impl ArrayDecoder for StringViewArrayDecoder {
TapeElement::F64(_) if coerce => {
data_capacity += 10;
}
+ _ if self.ignore_type_conflicts => {} // treat type conflicts
like nulls
_ => {
return Err(tape.error(p, "string"));
}
@@ -156,6 +161,9 @@ impl ArrayDecoder for StringViewArrayDecoder {
}
_ => unreachable!(),
},
+ _ if self.ignore_type_conflicts => {
+ builder.append_null();
+ }
_ => unreachable!(),
}
}
diff --git a/arrow-json/src/reader/struct_array.rs
b/arrow-json/src/reader/struct_array.rs
index 00dc55a5fd..cfad1ed612 100644
--- a/arrow-json/src/reader/struct_array.rs
+++ b/arrow-json/src/reader/struct_array.rs
@@ -75,6 +75,7 @@ pub struct StructArrayDecoder {
data_type: DataType,
decoders: Vec<Box<dyn ArrayDecoder>>,
strict_mode: bool,
+ ignore_type_conflicts: bool,
is_nullable: bool,
struct_mode: StructMode,
field_name_to_index: Option<HashMap<String, usize>>,
@@ -110,6 +111,7 @@ impl StructArrayDecoder {
data_type: data_type.clone(),
decoders,
strict_mode: ctx.strict_mode(),
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
is_nullable,
struct_mode,
field_name_to_index,
@@ -144,6 +146,10 @@ impl ArrayDecoder for StructArrayDecoder {
nulls.append(false);
continue;
}
+ (_, Some(nulls)) if self.ignore_type_conflicts => {
+ nulls.append(false);
+ continue;
+ }
(_, _) => return Err(tape.error(*p, "{")),
};
@@ -189,6 +195,10 @@ impl ArrayDecoder for StructArrayDecoder {
nulls.append(false);
continue;
}
+ (_, Some(nulls)) if self.ignore_type_conflicts => {
+ nulls.append(false);
+ continue;
+ }
(_, _) => return Err(tape.error(*p, "[")),
};
diff --git a/arrow-json/src/reader/timestamp_array.rs
b/arrow-json/src/reader/timestamp_array.rs
index 3fe4dc07af..7a3937565f 100644
--- a/arrow-json/src/reader/timestamp_array.rs
+++ b/arrow-json/src/reader/timestamp_array.rs
@@ -25,22 +25,24 @@ use arrow_cast::parse::string_to_datetime;
use arrow_schema::{ArrowError, DataType, TimeUnit};
use chrono::TimeZone;
-use crate::reader::ArrayDecoder;
use crate::reader::tape::{Tape, TapeElement};
+use crate::reader::{ArrayDecoder, DecoderContext};
/// A specialized [`ArrayDecoder`] for timestamps
pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
data_type: DataType,
timezone: Tz,
+ ignore_type_conflicts: bool,
// Invariant and Send
phantom: PhantomData<fn(P) -> P>,
}
impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
- pub fn new(data_type: &DataType, timezone: Tz) -> Self {
+ pub fn new(ctx: &DecoderContext, data_type: &DataType, timezone: Tz) ->
Self {
Self {
data_type: data_type.clone(),
timezone,
+ ignore_type_conflicts: ctx.ignore_type_conflicts(),
phantom: Default::default(),
}
}
@@ -54,10 +56,12 @@ where
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef,
ArrowError> {
let mut builder =
PrimitiveBuilder::<P>::with_capacity(pos.len()).with_data_type(self.data_type.clone());
-
for p in pos {
- match tape.get(*p) {
- TapeElement::Null => builder.append_null(),
+ let value = match tape.get(*p) {
+ TapeElement::Null => {
+ builder.append_null();
+ continue;
+ }
TapeElement::String(idx) => {
let s = tape.get_string(idx);
let date = string_to_datetime(&self.timezone,
s).map_err(|e| {
@@ -65,43 +69,44 @@ where
"failed to parse \"{s}\" as {}: {}",
self.data_type, e
))
- })?;
+ });
- let value = match P::UNIT {
- TimeUnit::Second => date.timestamp(),
- TimeUnit::Millisecond => date.timestamp_millis(),
- TimeUnit::Microsecond => date.timestamp_micros(),
+ date.and_then(|date| match P::UNIT {
+ TimeUnit::Second => Ok(date.timestamp()),
+ TimeUnit::Millisecond => Ok(date.timestamp_millis()),
+ TimeUnit::Microsecond => Ok(date.timestamp_micros()),
TimeUnit::Nanosecond =>
date.timestamp_nanos_opt().ok_or_else(|| {
ArrowError::ParseError(format!(
"{} would overflow 64-bit signed nanoseconds",
date.to_rfc3339(),
))
- })?,
- };
- builder.append_value(value)
+ }),
+ })
}
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
let b = s.as_bytes();
- let value = lexical_core::parse::<i64>(b)
+ lexical_core::parse::<i64>(b)
.or_else(|_| lexical_core::parse::<f64>(b).map(|x| x
as i64))
.map_err(|_| {
ArrowError::JsonError(format!(
"failed to parse {s} as {}",
self.data_type
))
- })?;
-
- builder.append_value(value)
+ })
}
- TapeElement::I32(v) => builder.append_value(v as i64),
+ TapeElement::I32(v) => Ok(v as i64),
TapeElement::I64(high) => match tape.get(p + 1) {
- TapeElement::I32(low) => {
- builder.append_value(((high as i64) << 32) | (low as
u32) as i64)
- }
+ TapeElement::I32(low) => Ok(((high as i64) << 32) | (low
as u32) as i64),
_ => unreachable!(),
},
- _ => return Err(tape.error(*p, "primitive")),
+ _ => Err(tape.error(*p, "primitive")),
+ };
+
+ match value {
+ Ok(value) => builder.append_value(value),
+ Err(_) if self.ignore_type_conflicts => builder.append_null(),
+ Err(e) => return Err(e),
}
}