This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5361d2ee09 fix: avro_to_arrow: Handle avro nested nullable struct
(union) (#7663)
5361d2ee09 is described below
commit 5361d2ee090156eed4a1d91f6530695588a9405a
Author: Samrose <[email protected]>
AuthorDate: Wed Oct 4 09:59:09 2023 -0700
fix: avro_to_arrow: Handle avro nested nullable struct (union) (#7663)
Corrects handling of a nullable struct union.
Signed-off-by: 🐼 Samrose Ahmed 🐼 <[email protected]>
---
datafusion/core/Cargo.toml | 1 +
.../datasource/avro_to_arrow/arrow_array_reader.rs | 669 +++++++++++++++++++--
.../core/src/datasource/avro_to_arrow/schema.rs | 60 +-
datafusion/sqllogictest/test_files/avro.slt | 8 +-
4 files changed, 670 insertions(+), 68 deletions(-)
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index d84d6a13c3..1db5d55baf 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -109,6 +109,7 @@ rand_distr = "0.4.3"
regex = "1.5.4"
rstest = "0.18.0"
rust_decimal = { version = "1.27.0", features = ["tokio-pg"] }
+serde_json = "1"
test-utils = { path = "../../test-utils" }
thiserror = "1.0.37"
tokio-postgres = "0.7.7"
diff --git a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
index f983e26d48..fd91ea1cc5 100644
--- a/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
@@ -82,7 +82,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
fields, mut lookup, ..
}) => {
for field in fields {
- Self::child_schema_lookup(&field.schema, &mut lookup)?;
+ Self::child_schema_lookup(&field.name, &field.schema, &mut
lookup)?;
}
Ok(lookup)
}
@@ -93,27 +93,51 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}
fn child_schema_lookup<'b>(
+ parent_field_name: &str,
schema: &AvroSchema,
schema_lookup: &'b mut BTreeMap<String, usize>,
) -> Result<&'b BTreeMap<String, usize>> {
match schema {
- AvroSchema::Record(RecordSchema {
- name,
- fields,
- lookup,
- ..
- }) => {
+ AvroSchema::Union(us) => {
+ let has_nullable = us
+ .find_schema_with_known_schemata::<apache_avro::Schema>(
+ &Value::Null,
+ None,
+ &None,
+ )
+ .is_some();
+ let sub_schemas = us.variants();
+ if has_nullable && sub_schemas.len() == 2 {
+ if let Some(sub_schema) =
+ sub_schemas.iter().find(|&s| !matches!(s,
AvroSchema::Null))
+ {
+ Self::child_schema_lookup(
+ parent_field_name,
+ sub_schema,
+ schema_lookup,
+ )?;
+ }
+ }
+ }
+ AvroSchema::Record(RecordSchema { fields, lookup, .. }) => {
lookup.iter().for_each(|(field_name, pos)| {
schema_lookup
- .insert(format!("{}.{}", name.fullname(None),
field_name), *pos);
+ .insert(format!("{}.{}", parent_field_name,
field_name), *pos);
});
for field in fields {
- Self::child_schema_lookup(&field.schema, schema_lookup)?;
+ let sub_parent_field_name =
+ format!("{}.{}", parent_field_name, field.name);
+ Self::child_schema_lookup(
+ &sub_parent_field_name,
+ &field.schema,
+ schema_lookup,
+ )?;
}
}
AvroSchema::Array(schema) => {
- Self::child_schema_lookup(schema, schema_lookup)?;
+ let sub_parent_field_name = format!("{}.element",
parent_field_name);
+ Self::child_schema_lookup(&sub_parent_field_name, schema,
schema_lookup)?;
}
_ => (),
}
@@ -147,7 +171,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
let projection = self.projection.clone().unwrap_or_default();
- let arrays = self.build_struct_array(&rows, self.schema.fields(),
&projection);
+ let arrays =
+ self.build_struct_array(&rows, "", self.schema.fields(),
&projection);
let projected_fields = if projection.is_empty() {
self.schema.fields().clone()
} else {
@@ -305,6 +330,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
for row in rows {
if let Some(value) = self.field_lookup(col_name, row) {
+ let value = maybe_resolve_union(value);
// value can be an array or a scalar
let vals: Vec<Option<String>> = if let Value::String(v) =
value {
vec![Some(v.to_string())]
@@ -444,6 +470,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
/// Build a nested GenericListArray from a list of unnested `Value`s
fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
&self,
+ parent_field_name: &str,
rows: &[&Value],
list_field: &Field,
) -> ArrowResult<ArrayRef> {
@@ -530,13 +557,19 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.collect::<LargeStringArray>()
.into_data(),
DataType::List(field) => {
- let child =
- self.build_nested_list_array::<i32>(&flatten_values(rows),
field)?;
+ let child = self.build_nested_list_array::<i32>(
+ parent_field_name,
+ &flatten_values(rows),
+ field,
+ )?;
child.to_data()
}
DataType::LargeList(field) => {
- let child =
- self.build_nested_list_array::<i64>(&flatten_values(rows),
field)?;
+ let child = self.build_nested_list_array::<i64>(
+ parent_field_name,
+ &flatten_values(rows),
+ field,
+ )?;
child.to_data()
}
DataType::Struct(fields) => {
@@ -554,16 +587,22 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let null_struct_array = vec![("null".to_string(),
Value::Null)];
let rows: Vec<&Vec<(String, Value)>> = rows
.iter()
+ .map(|v| maybe_resolve_union(v))
.flat_map(|row| {
if let Value::Array(values) = row {
- values.iter().for_each(|_| {
- bit_util::set_bit(&mut null_buffer,
struct_index);
- struct_index += 1;
- });
values
.iter()
+ .map(maybe_resolve_union)
.map(|v| match v {
- Value::Record(record) => record,
+ Value::Record(record) => {
+ bit_util::set_bit(&mut null_buffer,
struct_index);
+ struct_index += 1;
+ record
+ }
+ Value::Null => {
+ struct_index += 1;
+ &null_struct_array
+ }
other => panic!("expected Record, got
{other:?}"),
})
.collect::<Vec<&Vec<(String, Value)>>>()
@@ -573,7 +612,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
}
})
.collect();
- let arrays = self.build_struct_array(&rows, fields, &[])?;
+
+ let sub_parent_field_name =
+ format!("{}.{}", parent_field_name, list_field.name());
+ let arrays =
+ self.build_struct_array(&rows, &sub_parent_field_name,
fields, &[])?;
let data_type = DataType::Struct(fields.clone());
ArrayDataBuilder::new(data_type)
.len(rows.len())
@@ -610,6 +653,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
fn build_struct_array(
&self,
rows: RecordSlice,
+ parent_field_name: &str,
struct_fields: &Fields,
projection: &[String],
) -> ArrowResult<Vec<ArrayRef>> {
@@ -617,78 +661,83 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
.iter()
.filter(|field| projection.is_empty() ||
projection.contains(field.name()))
.map(|field| {
+ let field_path = if parent_field_name.is_empty() {
+ field.name().to_string()
+ } else {
+ format!("{}.{}", parent_field_name, field.name())
+ };
let arr = match field.data_type() {
DataType::Null => Arc::new(NullArray::new(rows.len())) as
ArrayRef,
- DataType::Boolean => self.build_boolean_array(rows,
field.name()),
+ DataType::Boolean => self.build_boolean_array(rows,
&field_path),
DataType::Float64 => {
- self.build_primitive_array::<Float64Type>(rows,
field.name())
+ self.build_primitive_array::<Float64Type>(rows,
&field_path)
}
DataType::Float32 => {
- self.build_primitive_array::<Float32Type>(rows,
field.name())
+ self.build_primitive_array::<Float32Type>(rows,
&field_path)
}
DataType::Int64 => {
- self.build_primitive_array::<Int64Type>(rows,
field.name())
+ self.build_primitive_array::<Int64Type>(rows,
&field_path)
}
DataType::Int32 => {
- self.build_primitive_array::<Int32Type>(rows,
field.name())
+ self.build_primitive_array::<Int32Type>(rows,
&field_path)
}
DataType::Int16 => {
- self.build_primitive_array::<Int16Type>(rows,
field.name())
+ self.build_primitive_array::<Int16Type>(rows,
&field_path)
}
DataType::Int8 => {
- self.build_primitive_array::<Int8Type>(rows,
field.name())
+ self.build_primitive_array::<Int8Type>(rows,
&field_path)
}
DataType::UInt64 => {
- self.build_primitive_array::<UInt64Type>(rows,
field.name())
+ self.build_primitive_array::<UInt64Type>(rows,
&field_path)
}
DataType::UInt32 => {
- self.build_primitive_array::<UInt32Type>(rows,
field.name())
+ self.build_primitive_array::<UInt32Type>(rows,
&field_path)
}
DataType::UInt16 => {
- self.build_primitive_array::<UInt16Type>(rows,
field.name())
+ self.build_primitive_array::<UInt16Type>(rows,
&field_path)
}
DataType::UInt8 => {
- self.build_primitive_array::<UInt8Type>(rows,
field.name())
+ self.build_primitive_array::<UInt8Type>(rows,
&field_path)
}
// TODO: this is incomplete
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => self
.build_primitive_array::<TimestampSecondType>(
rows,
- field.name(),
+ &field_path,
),
TimeUnit::Microsecond => self
.build_primitive_array::<TimestampMicrosecondType>(
rows,
- field.name(),
+ &field_path,
),
TimeUnit::Millisecond => self
.build_primitive_array::<TimestampMillisecondType>(
rows,
- field.name(),
+ &field_path,
),
TimeUnit::Nanosecond => self
.build_primitive_array::<TimestampNanosecondType>(
rows,
- field.name(),
+ &field_path,
),
},
DataType::Date64 => {
- self.build_primitive_array::<Date64Type>(rows,
field.name())
+ self.build_primitive_array::<Date64Type>(rows,
&field_path)
}
DataType::Date32 => {
- self.build_primitive_array::<Date32Type>(rows,
field.name())
+ self.build_primitive_array::<Date32Type>(rows,
&field_path)
}
DataType::Time64(unit) => match unit {
TimeUnit::Microsecond => self
.build_primitive_array::<Time64MicrosecondType>(
rows,
- field.name(),
+ &field_path,
),
TimeUnit::Nanosecond => self
.build_primitive_array::<Time64NanosecondType>(
rows,
- field.name(),
+ &field_path,
),
t => {
return Err(ArrowError::SchemaError(format!(
@@ -698,14 +747,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
},
DataType::Time32(unit) => match unit {
TimeUnit::Second => self
- .build_primitive_array::<Time32SecondType>(
- rows,
- field.name(),
- ),
+ .build_primitive_array::<Time32SecondType>(rows,
&field_path),
TimeUnit::Millisecond => self
.build_primitive_array::<Time32MillisecondType>(
rows,
- field.name(),
+ &field_path,
),
t => {
return Err(ArrowError::SchemaError(format!(
@@ -716,7 +762,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
DataType::Utf8 | DataType::LargeUtf8 => Arc::new(
rows.iter()
.map(|row| {
- let maybe_value =
self.field_lookup(field.name(), row);
+ let maybe_value =
self.field_lookup(&field_path, row);
match maybe_value {
None => Ok(None),
Some(v) => resolve_string(v),
@@ -728,7 +774,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
DataType::Binary | DataType::LargeBinary => Arc::new(
rows.iter()
.map(|row| {
- let maybe_value =
self.field_lookup(field.name(), row);
+ let maybe_value =
self.field_lookup(&field_path, row);
maybe_value.and_then(resolve_bytes)
})
.collect::<BinaryArray>(),
@@ -737,7 +783,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
DataType::FixedSizeBinary(ref size) => {
Arc::new(FixedSizeBinaryArray::try_from_sparse_iter_with_size(
rows.iter().map(|row| {
- let maybe_value =
self.field_lookup(field.name(), row);
+ let maybe_value =
self.field_lookup(&field_path, row);
maybe_value.and_then(|v| resolve_fixed(v,
*size as usize))
}),
*size,
@@ -746,18 +792,19 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
DataType::List(ref list_field) => {
match list_field.data_type() {
DataType::Dictionary(ref key_ty, _) => {
- self.build_wrapped_list_array(rows,
field.name(), key_ty)?
+ self.build_wrapped_list_array(rows,
&field_path, key_ty)?
}
_ => {
// extract rows by name
let extracted_rows = rows
.iter()
.map(|row| {
- self.field_lookup(field.name(), row)
+ self.field_lookup(&field_path, row)
.unwrap_or(&Value::Null)
})
.collect::<Vec<&Value>>();
self.build_nested_list_array::<i32>(
+ &field_path,
&extracted_rows,
list_field,
)?
@@ -767,7 +814,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
DataType::Dictionary(ref key_ty, ref val_ty) => self
.build_string_dictionary_array(
rows,
- field.name(),
+ &field_path,
key_ty,
val_ty,
)?,
@@ -775,21 +822,31 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let len = rows.len();
let num_bytes = bit_util::ceil(len, 8);
let mut null_buffer =
MutableBuffer::from_len_zeroed(num_bytes);
+ let empty_vec = vec![];
let struct_rows = rows
.iter()
.enumerate()
- .map(|(i, row)| (i,
self.field_lookup(field.name(), row)))
+ .map(|(i, row)| (i, self.field_lookup(&field_path,
row)))
.map(|(i, v)| {
- if let Some(Value::Record(value)) = v {
- bit_util::set_bit(&mut null_buffer, i);
- value
- } else {
- panic!("expected struct got {v:?}");
+ let v = v.map(maybe_resolve_union);
+ match v {
+ Some(Value::Record(value)) => {
+ bit_util::set_bit(&mut null_buffer, i);
+ value
+ }
+ None | Some(Value::Null) => &empty_vec,
+ other => {
+ panic!("expected struct got
{other:?}");
+ }
}
})
.collect::<Vec<&Vec<(String, Value)>>>();
- let arrays =
- self.build_struct_array(&struct_rows, fields,
&[])?;
+ let arrays = self.build_struct_array(
+ &struct_rows,
+ &field_path,
+ fields,
+ &[],
+ )?;
// construct a struct array's data in order to set
null buffer
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
@@ -1019,6 +1076,7 @@ mod test {
use crate::arrow::datatypes::{Field, TimeUnit};
use crate::datasource::avro_to_arrow::{Reader, ReaderBuilder};
use arrow::datatypes::DataType;
+ use datafusion_common::assert_batches_eq;
use datafusion_common::cast::{
as_int32_array, as_int64_array, as_list_array,
as_timestamp_microsecond_array,
};
@@ -1079,7 +1137,7 @@ mod test {
let a_array = as_list_array(batch.column(col_id_index)).unwrap();
assert_eq!(
*a_array.data_type(),
- DataType::List(Arc::new(Field::new("bigint", DataType::Int64,
true)))
+ DataType::List(Arc::new(Field::new("element", DataType::Int64,
true)))
);
let array = a_array.value(0);
assert_eq!(*array.data_type(), DataType::Int64);
@@ -1101,6 +1159,497 @@ mod test {
assert_eq!(batch.num_rows(), 3);
}
+ #[test]
+ fn test_complex_list() {
+ let schema = apache_avro::Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "r1",
+ "fields": [
+ {
+ "name": "headers",
+ "type": ["null", {
+ "type": "array",
+ "items": ["null",{
+ "name":"r2",
+ "type": "record",
+ "fields":[
+ {"name":"name", "type": ["null", "string"],
"default": null},
+ {"name":"value", "type": ["null", "string"],
"default": null}
+ ]
+ }]
+ }],
+ "default": null
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+ let r1 = apache_avro::to_value(serde_json::json!({
+ "headers": [
+ {
+ "name": "a",
+ "value": "b"
+ }
+ ]
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+
+ let mut w = apache_avro::Writer::new(&schema, vec![]);
+ w.append(r1).unwrap();
+ let bytes = w.into_inner().unwrap();
+
+ let mut reader = ReaderBuilder::new()
+ .read_schema()
+ .with_batch_size(2)
+ .build(std::io::Cursor::new(bytes))
+ .unwrap();
+
+ let batch = reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 1);
+ assert_eq!(batch.num_columns(), 1);
+ let expected = [
+ "+-----------------------+",
+ "| headers |",
+ "+-----------------------+",
+ "| [{name: a, value: b}] |",
+ "+-----------------------+",
+ ];
+ assert_batches_eq!(expected, &[batch]);
+ }
+
+ #[test]
+ fn test_complex_struct() {
+ let schema = apache_avro::Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "r1",
+ "fields": [
+ {
+ "name": "dns",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r13",
+ "fields": [
+ {
+ "name": "answers",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": [
+ "null",
+ {
+ "type": "record",
+ "name": "r292",
+ "fields": [
+ {
+ "name": "class",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "data",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "name",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "ttl",
+ "type": ["null", "long"],
+ "default": null
+ },
+ {
+ "name": "type",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "header_flags",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": ["null", "string"]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "id",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "op_code",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "question",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r288",
+ "fields": [
+ {
+ "name": "class",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "name",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "registered_domain",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "subdomain",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "top_level_domain",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "type",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "resolved_ip",
+ "type": [
+ "null",
+ {
+ "type": "array",
+ "items": ["null", "string"]
+ }
+ ],
+ "default": null
+ },
+ {
+ "name": "response_code",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "type",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+
+ let jv1 = serde_json::json!({
+ "dns": {
+ "answers": [
+ {
+ "data": "CHNlY3VyaXR5BnVidW50dQMjb20AAAEAAQAAAAgABLl9vic=",
+ "type": "1"
+ },
+ {
+ "data": "CHNlY3VyaXR5BnVidW50dQNjb20AAAEAABAAAAgABLl9viQ=",
+ "type": "1"
+ },
+ {
+ "data": "CHNlT3VyaXR5BnVidW50dQNjb20AAAEAAQAAAAgABFu9Wyc=",
+ "type": "1"
+ }
+ ],
+ "question": {
+ "name": "security.ubuntu.com",
+ "type": "A"
+ },
+ "resolved_ip": [
+ "67.43.156.1",
+ "67.43.156.2",
+ "67.43.156.3"
+ ],
+ "response_code": "0"
+ }
+ });
+ let r1 = apache_avro::to_value(jv1)
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+
+ let mut w = apache_avro::Writer::new(&schema, vec![]);
+ w.append(r1).unwrap();
+ let bytes = w.into_inner().unwrap();
+
+ let mut reader = ReaderBuilder::new()
+ .read_schema()
+ .with_batch_size(1)
+ .build(std::io::Cursor::new(bytes))
+ .unwrap();
+
+ let batch = reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 1);
+ assert_eq!(batch.num_columns(), 1);
+
+ let expected = [
+
"+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+ "| dns
[...]
+
"+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+ "| {answers: [{class: , data:
CHNlY3VyaXR5BnVidW50dQMjb20AAAEAAQAAAAgABLl9vic=, name: , ttl: , type: 1},
{class: , data: CHNlY3VyaXR5BnVidW50dQNjb20AAAEAABAAAAgABLl9viQ=, name: , ttl:
, type: 1}, {class: , data: CHNlT3VyaXR5BnVidW50dQNjb20AAAEAAQAAAAgABFu9Wyc=,
name: , ttl: , type: 1}], header_flags: , id: , op_code: , question: {class: ,
name: security.ubuntu.com, registered_domain: , subdomain: , top_level_domain:
, type: A}, resolved_ip: [67.43.156.1, 67.43.156.2, 67.43.15 [...]
+
"+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+ ];
+ assert_batches_eq!(expected, &[batch]);
+ }
+
+ #[test]
+ fn test_deep_nullable_struct() {
+ let schema = apache_avro::Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "r1",
+ "fields": [
+ {
+ "name": "col1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r2",
+ "fields": [
+ {
+ "name": "col2",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r3",
+ "fields": [
+ {
+ "name": "col3",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r4",
+ "fields": [
+ {
+ "name": "col4",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r5",
+ "fields": [
+ {
+ "name": "col5",
+ "type": ["null", "string"]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ "#,
+ )
+ .unwrap();
+ let r1 = apache_avro::to_value(serde_json::json!({
+ "col1": {
+ "col2": {
+ "col3": {
+ "col4": {
+ "col5": "hello"
+ }
+ }
+ }
+ }
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+ let r2 = apache_avro::to_value(serde_json::json!({
+ "col1": {
+ "col2": {
+ "col3": {
+ "col4": {
+ "col5": null
+ }
+ }
+ }
+ }
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+ let r3 = apache_avro::to_value(serde_json::json!({
+ "col1": {
+ "col2": {
+ "col3": null
+ }
+ }
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+ let r4 = apache_avro::to_value(serde_json::json!({
+ "col1": null
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+
+ let mut w = apache_avro::Writer::new(&schema, vec![]);
+ w.append(r1).unwrap();
+ w.append(r2).unwrap();
+ w.append(r3).unwrap();
+ w.append(r4).unwrap();
+ let bytes = w.into_inner().unwrap();
+
+ let mut reader = ReaderBuilder::new()
+ .read_schema()
+ .with_batch_size(4)
+ .build(std::io::Cursor::new(bytes))
+ .unwrap();
+
+ let batch = reader.next().unwrap().unwrap();
+
+ let expected = [
+ "+---------------------------------------+",
+ "| col1 |",
+ "+---------------------------------------+",
+ "| {col2: {col3: {col4: {col5: hello}}}} |",
+ "| {col2: {col3: {col4: {col5: }}}} |",
+ "| {col2: {col3: }} |",
+ "| |",
+ "+---------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &[batch]);
+ }
+
+ #[test]
+ fn test_avro_nullable_struct() {
+ let schema = apache_avro::Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "r1",
+ "fields": [
+ {
+ "name": "col1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r2",
+ "fields": [
+ {
+ "name": "col2",
+ "type": ["null", "string"]
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+ let r1 = apache_avro::to_value(serde_json::json!({
+ "col1": null
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+ let r2 = apache_avro::to_value(serde_json::json!({
+ "col1": {
+ "col2": "hello"
+ }
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+ let r3 = apache_avro::to_value(serde_json::json!({
+ "col1": {
+ "col2": null
+ }
+ }))
+ .unwrap()
+ .resolve(&schema)
+ .unwrap();
+
+ let mut w = apache_avro::Writer::new(&schema, vec![]);
+ w.append(r1).unwrap();
+ w.append(r2).unwrap();
+ w.append(r3).unwrap();
+ let bytes = w.into_inner().unwrap();
+
+ let mut reader = ReaderBuilder::new()
+ .read_schema()
+ .with_batch_size(3)
+ .build(std::io::Cursor::new(bytes))
+ .unwrap();
+ let batch = reader.next().unwrap().unwrap();
+ assert_eq!(batch.num_rows(), 3);
+ assert_eq!(batch.num_columns(), 1);
+
+ let expected = [
+ "+---------------+",
+ "| col1 |",
+ "+---------------+",
+ "| |",
+ "| {col2: hello} |",
+ "| {col2: } |",
+ "+---------------+",
+ ];
+ assert_batches_eq!(expected, &[batch]);
+ }
+
#[test]
fn test_avro_iterator() {
let reader = build_reader("alltypes_plain.avro", 5);
diff --git a/datafusion/core/src/datasource/avro_to_arrow/schema.rs
b/datafusion/core/src/datasource/avro_to_arrow/schema.rs
index f15e378cc6..761e6b6268 100644
--- a/datafusion/core/src/datasource/avro_to_arrow/schema.rs
+++ b/datafusion/core/src/datasource/avro_to_arrow/schema.rs
@@ -35,7 +35,7 @@ pub fn to_arrow_schema(avro_schema: &apache_avro::Schema) ->
Result<Schema> {
schema_fields.push(schema_to_field_with_props(
&field.schema,
Some(&field.name),
- false,
+ field.is_nullable(),
Some(external_props(&field.schema)),
)?)
}
@@ -73,7 +73,7 @@ fn schema_to_field_with_props(
AvroSchema::Bytes => DataType::Binary,
AvroSchema::String => DataType::Utf8,
AvroSchema::Array(item_schema) => DataType::List(Arc::new(
- schema_to_field_with_props(item_schema, None, false, None)?,
+ schema_to_field_with_props(item_schema, Some("element"), false,
None)?,
)),
AvroSchema::Map(value_schema) => {
let value_field =
@@ -116,7 +116,7 @@ fn schema_to_field_with_props(
DataType::Union(UnionFields::new(type_ids, fields),
UnionMode::Dense)
}
}
- AvroSchema::Record(RecordSchema { name, fields, .. }) => {
+ AvroSchema::Record(RecordSchema { fields, .. }) => {
let fields: Result<_> = fields
.iter()
.map(|field| {
@@ -129,7 +129,7 @@ fn schema_to_field_with_props(
}*/
schema_to_field_with_props(
&field.schema,
- Some(&format!("{}.{}", name.fullname(None),
field.name)),
+ Some(&field.name),
false,
Some(props),
)
@@ -442,6 +442,58 @@ mod test {
assert_eq!(arrow_schema.unwrap(), expected);
}
+ #[test]
+ fn test_nested_schema() {
+ let avro_schema = apache_avro::Schema::parse_str(
+ r#"
+ {
+ "type": "record",
+ "name": "r1",
+ "fields": [
+ {
+ "name": "col1",
+ "type": [
+ "null",
+ {
+ "type": "record",
+ "name": "r2",
+ "fields": [
+ {
+ "name": "col2",
+ "type": "string"
+ },
+ {
+ "name": "col3",
+ "type": ["null", "string"],
+ "default": null
+ }
+ ]
+ }
+ ],
+ "default": null
+ }
+ ]
+ }"#,
+ )
+ .unwrap();
+ // should not use Avro Record names.
+ let expected_arrow_schema = Schema::new(vec![Field::new(
+ "col1",
+ arrow::datatypes::DataType::Struct(
+ vec![
+ Field::new("col2", Utf8, false),
+ Field::new("col3", Utf8, true),
+ ]
+ .into(),
+ ),
+ true,
+ )]);
+ assert_eq!(
+ to_arrow_schema(&avro_schema).unwrap(),
+ expected_arrow_schema
+ );
+ }
+
#[test]
fn test_non_record_schema() {
let arrow_schema = to_arrow_schema(&AvroSchema::String);
diff --git a/datafusion/sqllogictest/test_files/avro.slt
b/datafusion/sqllogictest/test_files/avro.slt
index 3309cd1cf6..5cd268e8ef 100644
--- a/datafusion/sqllogictest/test_files/avro.slt
+++ b/datafusion/sqllogictest/test_files/avro.slt
@@ -225,11 +225,11 @@ SELECT id, CAST(string_col AS varchar) FROM
alltypes_plain_multi_files
1 1
# test avro nested records
-query ??
-SELECT f1, f2 FROM nested_records
+query ????
+SELECT f1, f2, f3, f4 FROM nested_records
----
-{ns2.record2.f1_1: aaa, ns2.record2.f1_2: 10, ns2.record2.f1_3:
{ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: true, ns4.record4.f2_2: 1.2},
{ns4.record4.f2_1: true, ns4.record4.f2_2: 2.2}]
-{ns2.record2.f1_1: bbb, ns2.record2.f1_2: 20, ns2.record2.f1_3:
{ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: false, ns4.record4.f2_2: 10.2}]
+{f1_1: aaa, f1_2: 10, f1_3: {f1_3_1: 3.14}} [{f2_1: true, f2_2: 1.2}, {f2_1:
true, f2_2: 2.2}] {f3_1: xyz} [{f4_1: 200}, ]
+{f1_1: bbb, f1_2: 20, f1_3: {f1_3_1: 3.14}} [{f2_1: false, f2_2: 10.2}] NULL
[, {f4_1: 300}]
# test avro enum
query TTT