This is an automated email from the ASF dual-hosted git repository.
mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new ca07b064db Add projection with default values support to
`RecordDecoder` (#8293)
ca07b064db is described below
commit ca07b064db5b242ae6f84c232f7b36a247cd930e
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Sep 16 03:00:00 2025 -0500
Add projection with default values support to `RecordDecoder` (#8293)
# Which issue does this PR close?
This work continues arrow-avro schema resolution support and aligns
behavior with the Avro spec.
- **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
the reader/decoder, including schema resolution and type promotion.
- **Follow-ups/Context**: #8292 (Add array/map/fixed schema resolution
and default value support to arrow-avro codec), #8124 (schema resolution
& type promotion for the decoder), #8223 (enum mapping for schema
resolution). These previous efforts established the foundations that
this PR extends to default values and additional resolvable types.
# Rationale for this change
Avro’s specification requires readers to materialize default values when
a field exists in the **reader** schema but not in the **writer**
schema, and to validate defaults (i.e., union defaults must match the
first branch; bytes/fixed defaults must be JSON strings; enums may
specify a default symbol for unknown writer symbols). Implementing this
behavior makes `arrow-avro` more standards‑compliant and improves
interoperability with evolving schemas.
# What changes are included in this PR?
**High‑level summary**
* **Refactor `RecordDecoder`** around a simpler **`Projector`**‑style
abstraction that consumes `ResolvedRecord` to: (a) skip writer‑only
fields, and (b) materialize reader‑only defaulted fields, reducing
branching in the hot path. (See commit subject and record decoder
changes.)
**Touched files (2):**
* `arrow-avro/src/reader/record.rs` - refactor decoder to use
precomputed mappings and defaults.
* `arrow-avro/src/reader/mod.rs` - add comprehensive tests for defaults
and error cases (see below).
# Are these changes tested?
Yes, new integration tests cover both the **happy path** and
**validation errors**:
* `test_schema_resolution_defaults_all_supported_types`: verifies that
defaults for
boolean/int/long/float/double/bytes/string/date/time/timestamp/decimal/fixed/enum/duration/uuid/array/map/nested
record and unions are materialized correctly for all rows.
* `test_schema_resolution_default_enum_invalid_symbol_errors`: invalid
enum default symbol is rejected.
* `test_schema_resolution_default_fixed_size_mismatch_errors`:
mismatched fixed/bytes default lengths are rejected.
These tests assert the Avro‑spec behavior (i.e., union defaults must
match the first branch; bytes/fixed defaults use JSON strings).
# Are there any user-facing changes?
N/A
---
arrow-avro/src/reader/mod.rs | 241 +++++++++
arrow-avro/src/reader/record.rs | 1069 +++++++++++++++++++++++++++++++++------
2 files changed, 1147 insertions(+), 163 deletions(-)
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 217366b633..bf72fc92c6 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -2085,6 +2085,245 @@ mod test {
assert!(batch.column(0).as_any().is::<StringViewArray>());
}
+ fn make_reader_schema_with_default_fields(
+ path: &str,
+ default_fields: Vec<Value>,
+ ) -> AvroSchema {
+ let mut root = load_writer_schema_json(path);
+ assert_eq!(root["type"], "record", "writer schema must be a record");
+ root.as_object_mut()
+ .expect("schema is a JSON object")
+ .insert("fields".to_string(), Value::Array(default_fields));
+ AvroSchema::new(root.to_string())
+ }
+
+ #[test]
+ fn test_schema_resolution_defaults_all_supported_types() {
+ let path = "test/data/skippable_types.avro";
+ let duration_default = "\u{0000}".repeat(12);
+ let reader_schema = make_reader_schema_with_default_fields(
+ path,
+ vec![
+
serde_json::json!({"name":"d_bool","type":"boolean","default":true}),
+ serde_json::json!({"name":"d_int","type":"int","default":42}),
+
serde_json::json!({"name":"d_long","type":"long","default":12345}),
+
serde_json::json!({"name":"d_float","type":"float","default":1.5}),
+
serde_json::json!({"name":"d_double","type":"double","default":2.25}),
+
serde_json::json!({"name":"d_bytes","type":"bytes","default":"XYZ"}),
+
serde_json::json!({"name":"d_string","type":"string","default":"hello"}),
+
serde_json::json!({"name":"d_date","type":{"type":"int","logicalType":"date"},"default":0}),
+
serde_json::json!({"name":"d_time_ms","type":{"type":"int","logicalType":"time-millis"},"default":1000}),
+
serde_json::json!({"name":"d_time_us","type":{"type":"long","logicalType":"time-micros"},"default":2000}),
+
serde_json::json!({"name":"d_ts_ms","type":{"type":"long","logicalType":"local-timestamp-millis"},"default":0}),
+
serde_json::json!({"name":"d_ts_us","type":{"type":"long","logicalType":"local-timestamp-micros"},"default":0}),
+
serde_json::json!({"name":"d_decimal","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"default":""}),
+
serde_json::json!({"name":"d_fixed","type":{"type":"fixed","name":"F4","size":4},"default":"ABCD"}),
+
serde_json::json!({"name":"d_enum","type":{"type":"enum","name":"E","symbols":["A","B","C"]},"default":"A"}),
+
serde_json::json!({"name":"d_duration","type":{"type":"fixed","name":"Dur","size":12,"logicalType":"duration"},"default":duration_default}),
+
serde_json::json!({"name":"d_uuid","type":{"type":"string","logicalType":"uuid"},"default":"00000000-0000-0000-0000-000000000000"}),
+
serde_json::json!({"name":"d_array","type":{"type":"array","items":"int"},"default":[1,2,3]}),
+
serde_json::json!({"name":"d_map","type":{"type":"map","values":"long"},"default":{"a":1,"b":2}}),
+ serde_json::json!({"name":"d_record","type":{
+ "type":"record","name":"DefaultRec","fields":[
+ {"name":"x","type":"int"},
+ {"name":"y","type":["null","string"],"default":null}
+ ]
+ },"default":{"x":7}}),
+
serde_json::json!({"name":"d_nullable_null","type":["null","int"],"default":null}),
+
serde_json::json!({"name":"d_nullable_value","type":["int","null"],"default":123}),
+ ],
+ );
+ let actual = read_alltypes_with_reader_schema(path, reader_schema);
+ let num_rows = actual.num_rows();
+ assert!(num_rows > 0, "skippable_types.avro should contain rows");
+ assert_eq!(
+ actual.num_columns(),
+ 22,
+ "expected exactly our defaulted fields"
+ );
+ let mut arrays: Vec<Arc<dyn Array>> = Vec::with_capacity(22);
+ arrays.push(Arc::new(BooleanArray::from_iter(std::iter::repeat_n(
+ Some(true),
+ num_rows,
+ ))));
+ arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(
+ 42, num_rows,
+ ))));
+ arrays.push(Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
+ 12345, num_rows,
+ ))));
+ arrays.push(Arc::new(Float32Array::from_iter_values(
+ std::iter::repeat_n(1.5f32, num_rows),
+ )));
+ arrays.push(Arc::new(Float64Array::from_iter_values(
+ std::iter::repeat_n(2.25f64, num_rows),
+ )));
+ arrays.push(Arc::new(BinaryArray::from_iter_values(
+ std::iter::repeat_n(b"XYZ".as_ref(), num_rows),
+ )));
+ arrays.push(Arc::new(StringArray::from_iter_values(
+ std::iter::repeat_n("hello", num_rows),
+ )));
+ arrays.push(Arc::new(Date32Array::from_iter_values(
+ std::iter::repeat_n(0, num_rows),
+ )));
+ arrays.push(Arc::new(Time32MillisecondArray::from_iter_values(
+ std::iter::repeat_n(1_000, num_rows),
+ )));
+ arrays.push(Arc::new(Time64MicrosecondArray::from_iter_values(
+ std::iter::repeat_n(2_000i64, num_rows),
+ )));
+ arrays.push(Arc::new(TimestampMillisecondArray::from_iter_values(
+ std::iter::repeat_n(0i64, num_rows),
+ )));
+ arrays.push(Arc::new(TimestampMicrosecondArray::from_iter_values(
+ std::iter::repeat_n(0i64, num_rows),
+ )));
+ #[cfg(feature = "small_decimals")]
+ let decimal =
Decimal64Array::from_iter_values(std::iter::repeat_n(0i64, num_rows))
+ .with_precision_and_scale(10, 2)
+ .unwrap();
+ #[cfg(not(feature = "small_decimals"))]
+ let decimal =
Decimal128Array::from_iter_values(std::iter::repeat_n(0i128, num_rows))
+ .with_precision_and_scale(10, 2)
+ .unwrap();
+ arrays.push(Arc::new(decimal));
+ let fixed_iter = std::iter::repeat_n(Some(*b"ABCD"), num_rows);
+ arrays.push(Arc::new(
+ FixedSizeBinaryArray::try_from_sparse_iter_with_size(fixed_iter,
4).unwrap(),
+ ));
+ let enum_keys = Int32Array::from_iter_values(std::iter::repeat_n(0,
num_rows));
+ let enum_values = StringArray::from_iter_values(["A", "B", "C"]);
+ let enum_arr =
+ DictionaryArray::<Int32Type>::try_new(enum_keys,
Arc::new(enum_values)).unwrap();
+ arrays.push(Arc::new(enum_arr));
+ let duration_values = std::iter::repeat_n(
+ Some(IntervalMonthDayNanoType::make_value(0, 0, 0)),
+ num_rows,
+ );
+ let duration_arr: IntervalMonthDayNanoArray =
duration_values.collect();
+ arrays.push(Arc::new(duration_arr));
+ let uuid_bytes = [0u8; 16];
+ let uuid_iter = std::iter::repeat_n(Some(uuid_bytes), num_rows);
+ arrays.push(Arc::new(
+ FixedSizeBinaryArray::try_from_sparse_iter_with_size(uuid_iter,
16).unwrap(),
+ ));
+ let item_field = Arc::new(Field::new(
+ Field::LIST_FIELD_DEFAULT_NAME,
+ DataType::Int32,
+ false,
+ ));
+ let mut list_builder =
ListBuilder::new(Int32Builder::new()).with_field(item_field);
+ for _ in 0..num_rows {
+ list_builder.values().append_value(1);
+ list_builder.values().append_value(2);
+ list_builder.values().append_value(3);
+ list_builder.append(true);
+ }
+ arrays.push(Arc::new(list_builder.finish()));
+ let values_field = Arc::new(Field::new("value", DataType::Int64,
false));
+ let mut map_builder = MapBuilder::new(
+ Some(builder::MapFieldNames {
+ entry: "entries".to_string(),
+ key: "key".to_string(),
+ value: "value".to_string(),
+ }),
+ StringBuilder::new(),
+ Int64Builder::new(),
+ )
+ .with_values_field(values_field);
+ for _ in 0..num_rows {
+ let (keys, vals) = map_builder.entries();
+ keys.append_value("a");
+ vals.append_value(1);
+ keys.append_value("b");
+ vals.append_value(2);
+ map_builder.append(true).unwrap();
+ }
+ arrays.push(Arc::new(map_builder.finish()));
+ let rec_fields: Fields = Fields::from(vec![
+ Field::new("x", DataType::Int32, false),
+ Field::new("y", DataType::Utf8, true),
+ ]);
+ let mut sb = StructBuilder::new(
+ rec_fields.clone(),
+ vec![
+ Box::new(Int32Builder::new()),
+ Box::new(StringBuilder::new()),
+ ],
+ );
+ for _ in 0..num_rows {
+ sb.field_builder::<Int32Builder>(0).unwrap().append_value(7);
+ sb.field_builder::<StringBuilder>(1).unwrap().append_null();
+ sb.append(true);
+ }
+ arrays.push(Arc::new(sb.finish()));
+ arrays.push(Arc::new(Int32Array::from_iter(std::iter::repeat_n(
+ None::<i32>,
+ num_rows,
+ ))));
+ arrays.push(Arc::new(Int32Array::from_iter_values(std::iter::repeat_n(
+ 123, num_rows,
+ ))));
+ let expected = RecordBatch::try_new(actual.schema(), arrays).unwrap();
+ assert_eq!(
+ actual, expected,
+ "defaults should materialize correctly for all fields"
+ );
+ }
+
+ #[test]
+ fn test_schema_resolution_default_enum_invalid_symbol_errors() {
+ let path = "test/data/skippable_types.avro";
+ let bad_schema = make_reader_schema_with_default_fields(
+ path,
+ vec![serde_json::json!({
+ "name":"bad_enum",
+ "type":{"type":"enum","name":"E","symbols":["A","B","C"]},
+ "default":"Z"
+ })],
+ );
+ let file = File::open(path).unwrap();
+ let res = ReaderBuilder::new()
+ .with_reader_schema(bad_schema)
+ .build(BufReader::new(file));
+ let err = res.expect_err("expected enum default validation to fail");
+ let msg = err.to_string();
+ let lower_msg = msg.to_lowercase();
+ assert!(
+ lower_msg.contains("enum")
+ && (lower_msg.contains("symbol") ||
lower_msg.contains("default")),
+ "unexpected error: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_schema_resolution_default_fixed_size_mismatch_errors() {
+ let path = "test/data/skippable_types.avro";
+ let bad_schema = make_reader_schema_with_default_fields(
+ path,
+ vec![serde_json::json!({
+ "name":"bad_fixed",
+ "type":{"type":"fixed","name":"F","size":4},
+ "default":"ABC"
+ })],
+ );
+ let file = File::open(path).unwrap();
+ let res = ReaderBuilder::new()
+ .with_reader_schema(bad_schema)
+ .build(BufReader::new(file));
+ let err = res.expect_err("expected fixed default validation to fail");
+ let msg = err.to_string();
+ let lower_msg = msg.to_lowercase();
+ assert!(
+ lower_msg.contains("fixed")
+ && (lower_msg.contains("size")
+ || lower_msg.contains("length")
+ || lower_msg.contains("does not match")),
+ "unexpected error: {msg}"
+ );
+ }
+
#[test]
fn test_alltypes_skip_writer_fields_keep_double_only() {
let file = arrow_test_data("avro/alltypes_plain.avro");
@@ -2538,6 +2777,7 @@ mod test {
let values_i128: Vec<i128> = (1..=24).map(|n| (n as i128) *
pow10).collect();
let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef {
match *dt {
+ #[cfg(feature = "small_decimals")]
DataType::Decimal32(p, s) => {
let it = values.iter().map(|&v| v as i32);
Arc::new(
@@ -2546,6 +2786,7 @@ mod test {
.unwrap(),
)
}
+ #[cfg(feature = "small_decimals")]
DataType::Decimal64(p, s) => {
let it = values.iter().map(|&v| v as i64);
Arc::new(
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 48eb601024..9ca8acb45b 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -15,27 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-use crate::codec::{AvroDataType, Codec, Promotion, ResolutionInfo};
+use crate::codec::{
+ AvroDataType, AvroField, AvroLiteral, Codec, Promotion, ResolutionInfo,
ResolvedRecord,
+};
use crate::reader::block::{Block, BlockDecoder};
use crate::reader::cursor::AvroCursor;
-use crate::reader::header::Header;
-use crate::schema::*;
+use crate::schema::Nullability;
use arrow_array::builder::{
- ArrayBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder,
Decimal64Builder,
- IntervalMonthDayNanoBuilder, PrimitiveBuilder,
+ Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder,
StringViewBuilder,
};
+#[cfg(feature = "small_decimals")]
+use arrow_array::builder::{Decimal32Builder, Decimal64Builder};
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::*;
use arrow_schema::{
- ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit,
- Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION,
DECIMAL256_MAX_PRECISION,
+ ArrowError, DataType, Field as ArrowField, FieldRef, Fields, Schema as
ArrowSchema, SchemaRef,
+ DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION,
};
#[cfg(feature = "small_decimals")]
use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
use std::cmp::Ordering;
-use std::collections::HashMap;
-use std::io::Read;
use std::sync::Arc;
use uuid::Uuid;
@@ -60,6 +60,29 @@ macro_rules! flush_decimal {
}};
}
+/// Macro to append a default decimal value from two's-complement big-endian
bytes
+/// into the corresponding decimal builder, with compile-time constructed
error text.
+macro_rules! append_decimal_default {
+ ($lit:expr, $builder:expr, $N:literal, $Int:ty, $name:literal) => {{
+ match $lit {
+ AvroLiteral::Bytes(b) => {
+ let ext = sign_cast_to::<$N>(b)?;
+ let val = <$Int>::from_be_bytes(ext);
+ $builder.append_value(val);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ concat!(
+ "Default for ",
+ $name,
+ " must be bytes (two's-complement big-endian)"
+ )
+ .to_string(),
+ )),
+ }
+ }};
+}
+
#[derive(Debug)]
pub(crate) struct RecordDecoderBuilder<'a> {
data_type: &'a AvroDataType,
@@ -91,15 +114,7 @@ pub(crate) struct RecordDecoder {
schema: SchemaRef,
fields: Vec<Decoder>,
use_utf8view: bool,
- resolved: Option<ResolvedRuntime>,
-}
-
-#[derive(Debug)]
-struct ResolvedRuntime {
- /// writer field index -> reader field index (or None if writer-only)
- writer_to_reader: Arc<[Option<usize>]>,
- /// per-writer-field skipper (Some only when writer-only)
- skip_decoders: Vec<Option<Skipper>>,
+ projector: Option<Projector>,
}
impl RecordDecoder {
@@ -138,14 +153,9 @@ impl RecordDecoder {
arrow_fields.push(avro_field.field());
encodings.push(Decoder::try_new(avro_field.data_type())?);
}
- // If this record carries resolution metadata, prepare
top-level runtime helpers
- let resolved = match data_type.resolution.as_ref() {
+ let projector = match data_type.resolution.as_ref() {
Some(ResolutionInfo::Record(rec)) => {
- let skip_decoders =
build_skip_decoders(&rec.skip_fields)?;
- Some(ResolvedRuntime {
- writer_to_reader: rec.writer_to_reader.clone(),
- skip_decoders,
- })
+ Some(ProjectorBuilder::try_new(rec,
reader_fields).build()?)
}
_ => None,
};
@@ -153,7 +163,7 @@ impl RecordDecoder {
schema: Arc::new(ArrowSchema::new(arrow_fields)),
fields: encodings,
use_utf8view,
- resolved,
+ projector,
})
}
other => Err(ArrowError::ParseError(format!(
@@ -170,17 +180,10 @@ impl RecordDecoder {
/// Decode `count` records from `buf`
pub(crate) fn decode(&mut self, buf: &[u8], count: usize) -> Result<usize,
ArrowError> {
let mut cursor = AvroCursor::new(buf);
- match self.resolved.as_mut() {
- Some(runtime) => {
- // Top-level resolved record: read writer fields in writer
order,
- // project into reader fields, and skip writer-only fields
+ match self.projector.as_mut() {
+ Some(proj) => {
for _ in 0..count {
- decode_with_resolution(
- &mut cursor,
- &mut self.fields,
- &runtime.writer_to_reader,
- &mut runtime.skip_decoders,
- )?;
+ proj.project_record(&mut cursor, &mut self.fields)?;
}
}
None => {
@@ -205,24 +208,10 @@ impl RecordDecoder {
}
}
-fn decode_with_resolution(
- buf: &mut AvroCursor<'_>,
- encodings: &mut [Decoder],
- writer_to_reader: &[Option<usize>],
- skippers: &mut [Option<Skipper>],
-) -> Result<(), ArrowError> {
- for (w_idx, (target, skipper_opt)) in
writer_to_reader.iter().zip(skippers).enumerate() {
- match (*target, skipper_opt.as_mut()) {
- (Some(r_idx), _) => encodings[r_idx].decode(buf)?,
- (None, Some(sk)) => sk.skip(buf)?,
- (None, None) => {
- return Err(ArrowError::SchemaError(format!(
- "No skipper available for writer-only field at index
{w_idx}",
- )));
- }
- }
- }
- Ok(())
+#[derive(Debug)]
+struct EnumResolution {
+ mapping: Arc<[i32]>,
+ default_index: i32,
}
#[derive(Debug)]
@@ -252,7 +241,7 @@ enum Decoder {
/// String data encoded as UTF-8 bytes, but mapped to Arrow's
StringViewArray
StringView(OffsetBufferBuilder<i32>, Vec<u8>),
Array(FieldRef, OffsetBufferBuilder<i32>, Box<Decoder>),
- Record(Fields, Vec<Decoder>),
+ Record(Fields, Vec<Decoder>, Option<Projector>),
Map(
FieldRef,
OffsetBufferBuilder<i32>,
@@ -261,27 +250,16 @@ enum Decoder {
Box<Decoder>,
),
Fixed(i32, Vec<u8>),
- Enum(Vec<i32>, Arc<[String]>),
+ Enum(Vec<i32>, Arc<[String]>, Option<EnumResolution>),
Duration(IntervalMonthDayNanoBuilder),
Uuid(Vec<u8>),
+ #[cfg(feature = "small_decimals")]
Decimal32(usize, Option<usize>, Option<usize>, Decimal32Builder),
+ #[cfg(feature = "small_decimals")]
Decimal64(usize, Option<usize>, Option<usize>, Decimal64Builder),
Decimal128(usize, Option<usize>, Option<usize>, Decimal128Builder),
Decimal256(usize, Option<usize>, Option<usize>, Decimal256Builder),
Nullable(Nullability, NullBufferBuilder, Box<Decoder>),
- EnumResolved {
- indices: Vec<i32>,
- symbols: Arc<[String]>,
- mapping: Arc<[i32]>,
- default_index: i32,
- },
- /// Resolved record that needs writer->reader projection and skipping
writer-only fields
- RecordResolved {
- fields: Fields,
- encodings: Vec<Decoder>,
- writer_to_reader: Arc<[Option<usize>]>,
- skip_decoders: Vec<Option<Skipper>>,
- },
}
impl Decoder {
@@ -403,16 +381,14 @@ impl Decoder {
)
}
(Codec::Enum(symbols), _) => {
- if let Some(ResolutionInfo::EnumMapping(mapping)) =
data_type.resolution.as_ref() {
- Self::EnumResolved {
- indices: Vec::with_capacity(DEFAULT_CAPACITY),
- symbols: symbols.clone(),
+ let res = match data_type.resolution.as_ref() {
+ Some(ResolutionInfo::EnumMapping(mapping)) =>
Some(EnumResolution {
mapping: mapping.mapping.clone(),
default_index: mapping.default_index,
- }
- } else {
- Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY),
symbols.clone())
- }
+ }),
+ _ => None,
+ };
+ Self::Enum(Vec::with_capacity(DEFAULT_CAPACITY),
symbols.clone(), res)
}
(Codec::Struct(fields), _) => {
let mut arrow_fields = Vec::with_capacity(fields.len());
@@ -422,17 +398,13 @@ impl Decoder {
arrow_fields.push(avro_field.field());
encodings.push(encoding);
}
- if let Some(ResolutionInfo::Record(rec)) =
data_type.resolution.as_ref() {
- let skip_decoders = build_skip_decoders(&rec.skip_fields)?;
- Self::RecordResolved {
- fields: arrow_fields.into(),
- encodings,
- writer_to_reader: rec.writer_to_reader.clone(),
- skip_decoders,
- }
- } else {
- Self::Record(arrow_fields.into(), encodings)
- }
+ let projector =
+ if let Some(ResolutionInfo::Record(rec)) =
data_type.resolution.as_ref() {
+ Some(ProjectorBuilder::try_new(rec, fields).build()?)
+ } else {
+ None
+ };
+ Self::Record(arrow_fields.into(), encodings, projector)
}
(Codec::Map(child), _) => {
let val_field = child.field_with_name("value");
@@ -494,27 +466,263 @@ impl Decoder {
Self::Array(_, offsets, e) => {
offsets.push_length(0);
}
- Self::Record(_, e) => e.iter_mut().for_each(|e| e.append_null()),
+ Self::Record(_, e, _) => e.iter_mut().for_each(|e|
e.append_null()),
Self::Map(_, _koff, moff, _, _) => {
moff.push_length(0);
}
Self::Fixed(sz, accum) => {
accum.extend(std::iter::repeat_n(0u8, *sz as usize));
}
+ #[cfg(feature = "small_decimals")]
Self::Decimal32(_, _, _, builder) => builder.append_value(0),
+ #[cfg(feature = "small_decimals")]
Self::Decimal64(_, _, _, builder) => builder.append_value(0),
Self::Decimal128(_, _, _, builder) => builder.append_value(0),
Self::Decimal256(_, _, _, builder) =>
builder.append_value(i256::ZERO),
- Self::Enum(indices, _) => indices.push(0),
- Self::EnumResolved { indices, .. } => indices.push(0),
+ Self::Enum(indices, _, _) => indices.push(0),
Self::Duration(builder) => builder.append_null(),
Self::Nullable(_, null_buffer, inner) => {
null_buffer.append(false);
inner.append_null();
}
- Self::RecordResolved { encodings, .. } => {
- encodings.iter_mut().for_each(|e| e.append_null());
+ }
+ }
+
+ /// Append a single default literal into the decoder's buffers
+ fn append_default(&mut self, lit: &AvroLiteral) -> Result<(), ArrowError> {
+ match self {
+ Self::Nullable(_, nb, inner) => {
+ if matches!(lit, AvroLiteral::Null) {
+ nb.append(false);
+ inner.append_null();
+ Ok(())
+ } else {
+ nb.append(true);
+ inner.append_default(lit)
+ }
+ }
+ Self::Null(count) => match lit {
+ AvroLiteral::Null => {
+ *count += 1;
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Non-null default for null type".to_string(),
+ )),
+ },
+ Self::Boolean(b) => match lit {
+ AvroLiteral::Boolean(v) => {
+ b.append(*v);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for boolean must be boolean".to_string(),
+ )),
+ },
+ Self::Int32(v) | Self::Date32(v) | Self::TimeMillis(v) => match
lit {
+ AvroLiteral::Int(i) => {
+ v.push(*i);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for int32/date32/time-millis must be
int".to_string(),
+ )),
+ },
+ Self::Int64(v)
+ | Self::Int32ToInt64(v)
+ | Self::TimeMicros(v)
+ | Self::TimestampMillis(_, v)
+ | Self::TimestampMicros(_, v) => match lit {
+ AvroLiteral::Long(i) => {
+ v.push(*i);
+ Ok(())
+ }
+ AvroLiteral::Int(i) => {
+ v.push(*i as i64);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for long/time-micros/timestamp must be long or
int".to_string(),
+ )),
+ },
+ Self::Float32(v) | Self::Int32ToFloat32(v) |
Self::Int64ToFloat32(v) => match lit {
+ AvroLiteral::Float(f) => {
+ v.push(*f);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for float must be float".to_string(),
+ )),
+ },
+ Self::Float64(v)
+ | Self::Int32ToFloat64(v)
+ | Self::Int64ToFloat64(v)
+ | Self::Float32ToFloat64(v) => match lit {
+ AvroLiteral::Double(f) => {
+ v.push(*f);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for double must be double".to_string(),
+ )),
+ },
+ Self::Binary(offsets, values) | Self::StringToBytes(offsets,
values) => match lit {
+ AvroLiteral::Bytes(b) => {
+ offsets.push_length(b.len());
+ values.extend_from_slice(b);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for bytes must be bytes".to_string(),
+ )),
+ },
+ Self::BytesToString(offsets, values)
+ | Self::String(offsets, values)
+ | Self::StringView(offsets, values) => match lit {
+ AvroLiteral::String(s) => {
+ let b = s.as_bytes();
+ offsets.push_length(b.len());
+ values.extend_from_slice(b);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for string must be string".to_string(),
+ )),
+ },
+ Self::Uuid(values) => match lit {
+ AvroLiteral::String(s) => {
+ let uuid = Uuid::try_parse(s).map_err(|e| {
+ ArrowError::InvalidArgumentError(format!("Invalid UUID
default: {s} ({e})"))
+ })?;
+ values.extend_from_slice(uuid.as_bytes());
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for uuid must be string".to_string(),
+ )),
+ },
+ Self::Fixed(sz, accum) => match lit {
+ AvroLiteral::Bytes(b) => {
+ if b.len() != *sz as usize {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Fixed default length {} does not match size {sz}",
+ b.len(),
+ )));
+ }
+ accum.extend_from_slice(b);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for fixed must be bytes".to_string(),
+ )),
+ },
+ #[cfg(feature = "small_decimals")]
+ Self::Decimal32(_, _, _, builder) => {
+ append_decimal_default!(lit, builder, 4, i32, "decimal32")
+ }
+ #[cfg(feature = "small_decimals")]
+ Self::Decimal64(_, _, _, builder) => {
+ append_decimal_default!(lit, builder, 8, i64, "decimal64")
+ }
+ Self::Decimal128(_, _, _, builder) => {
+ append_decimal_default!(lit, builder, 16, i128, "decimal128")
+ }
+ Self::Decimal256(_, _, _, builder) => {
+ append_decimal_default!(lit, builder, 32, i256, "decimal256")
}
+ Self::Duration(builder) => match lit {
+ AvroLiteral::Bytes(b) => {
+ if b.len() != 12 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Duration default must be exactly 12 bytes, got
{}",
+ b.len()
+ )));
+ }
+ let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
+ let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
+ let millis = u32::from_le_bytes([b[8], b[9], b[10],
b[11]]);
+ let nanos = (millis as i64) * 1_000_000;
+ builder.append_value(IntervalMonthDayNano::new(
+ months as i32,
+ days as i32,
+ nanos,
+ ));
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for duration must be 12-byte little-endian
months/days/millis"
+ .to_string(),
+ )),
+ },
+ Self::Array(_, offsets, inner) => match lit {
+ AvroLiteral::Array(items) => {
+ offsets.push_length(items.len());
+ for item in items {
+ inner.append_default(item)?;
+ }
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for array must be an array literal".to_string(),
+ )),
+ },
+ Self::Map(_, koff, moff, kdata, valdec) => match lit {
+ AvroLiteral::Map(entries) => {
+ moff.push_length(entries.len());
+ for (k, v) in entries {
+ let kb = k.as_bytes();
+ koff.push_length(kb.len());
+ kdata.extend_from_slice(kb);
+ valdec.append_default(v)?;
+ }
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for map must be a map/object literal".to_string(),
+ )),
+ },
+ Self::Enum(indices, symbols, _) => match lit {
+ AvroLiteral::Enum(sym) => {
+ let pos = symbols.iter().position(|s| s ==
sym).ok_or_else(|| {
+ ArrowError::InvalidArgumentError(format!(
+ "Enum default symbol {sym:?} not in reader symbols"
+ ))
+ })?;
+ indices.push(pos as i32);
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for enum must be a symbol".to_string(),
+ )),
+ },
+ Self::Record(field_meta, decoders, projector) => match lit {
+ AvroLiteral::Map(entries) => {
+ for (i, dec) in decoders.iter_mut().enumerate() {
+ let name = field_meta[i].name();
+ if let Some(sub) = entries.get(name) {
+ dec.append_default(sub)?;
+ } else if let Some(proj) = projector.as_ref() {
+ proj.project_default(dec, i)?;
+ } else {
+ dec.append_null();
+ }
+ }
+ Ok(())
+ }
+ AvroLiteral::Null => {
+ for (i, dec) in decoders.iter_mut().enumerate() {
+ if let Some(proj) = projector.as_ref() {
+ proj.project_default(dec, i)?;
+ } else {
+ dec.append_null();
+ }
+ }
+ Ok(())
+ }
+ _ => Err(ArrowError::InvalidArgumentError(
+ "Default for record must be a map/object or
null".to_string(),
+ )),
+ },
}
}
@@ -560,11 +768,14 @@ impl Decoder {
let total_items = read_blocks(buf, |cursor|
encoding.decode(cursor))?;
off.push_length(total_items);
}
- Self::Record(_, encodings) => {
+ Self::Record(_, encodings, None) => {
for encoding in encodings {
encoding.decode(buf)?;
}
}
+ Self::Record(_, encodings, Some(proj)) => {
+ proj.project_record(buf, encodings)?;
+ }
Self::Map(_, koff, moff, kdata, valdec) => {
let newly_added = read_blocks(buf, |cur| {
let kb = cur.get_bytes()?;
@@ -578,9 +789,11 @@ impl Decoder {
let fx = buf.get_fixed(*sz as usize)?;
accum.extend_from_slice(fx);
}
+ #[cfg(feature = "small_decimals")]
Self::Decimal32(_, _, size, builder) => {
decode_decimal!(size, buf, builder, 4, i32);
}
+ #[cfg(feature = "small_decimals")]
Self::Decimal64(_, _, size, builder) => {
decode_decimal!(size, buf, builder, 8, i64);
}
@@ -590,21 +803,16 @@ impl Decoder {
Self::Decimal256(_, _, size, builder) => {
decode_decimal!(size, buf, builder, 32, i256);
}
- Self::Enum(indices, _) => {
+ Self::Enum(indices, _, None) => {
indices.push(buf.get_int()?);
}
- Self::EnumResolved {
- indices,
- mapping,
- default_index,
- ..
- } => {
+ Self::Enum(indices, _, Some(res)) => {
let raw = buf.get_int()?;
let resolved = usize::try_from(raw)
.ok()
- .and_then(|idx| mapping.get(idx).copied())
+ .and_then(|idx| res.mapping.get(idx).copied())
.filter(|&idx| idx >= 0)
- .unwrap_or(*default_index);
+ .unwrap_or(res.default_index);
if resolved >= 0 {
indices.push(resolved);
} else {
@@ -635,14 +843,6 @@ impl Decoder {
}
nb.append(is_not_null);
}
- Self::RecordResolved {
- encodings,
- writer_to_reader,
- skip_decoders,
- ..
- } => {
- decode_with_resolution(buf, encodings, writer_to_reader,
skip_decoders)?;
- }
}
Ok(())
}
@@ -711,7 +911,7 @@ impl Decoder {
let offsets = flush_offsets(offsets);
Arc::new(ListArray::new(field.clone(), offsets, values, nulls))
}
- Self::Record(fields, encodings) => {
+ Self::Record(fields, encodings, _) => {
let arrays = encodings
.iter_mut()
.map(|x| x.flush(None))
@@ -764,9 +964,11 @@ impl Decoder {
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
Arc::new(arr)
}
+ #[cfg(feature = "small_decimals")]
Self::Decimal32(precision, scale, _, builder) => {
flush_decimal!(builder, precision, scale, nulls,
Decimal32Array)
}
+ #[cfg(feature = "small_decimals")]
Self::Decimal64(precision, scale, _, builder) => {
flush_decimal!(builder, precision, scale, nulls,
Decimal64Array)
}
@@ -776,25 +978,13 @@ impl Decoder {
Self::Decimal256(precision, scale, _, builder) => {
flush_decimal!(builder, precision, scale, nulls,
Decimal256Array)
}
- Self::Enum(indices, symbols) => flush_dict(indices, symbols,
nulls)?,
- Self::EnumResolved {
- indices, symbols, ..
- } => flush_dict(indices, symbols, nulls)?,
+ Self::Enum(indices, symbols, _) => flush_dict(indices, symbols,
nulls)?,
Self::Duration(builder) => {
let (_, vals, _) = builder.finish().into_parts();
let vals = IntervalMonthDayNanoArray::try_new(vals, nulls)
.map_err(|e| ArrowError::ParseError(e.to_string()))?;
Arc::new(vals)
}
- Self::RecordResolved {
- fields, encodings, ..
- } => {
- let arrays = encodings
- .iter_mut()
- .map(|x| x.flush(None))
- .collect::<Result<Vec<_>, _>>()?;
- Arc::new(StructArray::new(fields.clone(), arrays, nulls))
- }
})
}
}
@@ -976,6 +1166,120 @@ fn sign_cast_to<const N: usize>(raw: &[u8]) ->
Result<[u8; N], ArrowError> {
Ok(out)
}
+#[derive(Debug)]
+struct Projector {
+ writer_to_reader: Arc<[Option<usize>]>,
+ skip_decoders: Vec<Option<Skipper>>,
+ field_defaults: Vec<Option<AvroLiteral>>,
+ default_injections: Arc<[(usize, AvroLiteral)]>,
+}
+
+#[derive(Debug)]
+struct ProjectorBuilder<'a> {
+ rec: &'a ResolvedRecord,
+ reader_fields: Arc<[AvroField]>,
+}
+
+impl<'a> ProjectorBuilder<'a> {
+ #[inline]
+ fn try_new(rec: &'a ResolvedRecord, reader_fields: &Arc<[AvroField]>) ->
Self {
+ Self {
+ rec,
+ reader_fields: reader_fields.clone(),
+ }
+ }
+
+ #[inline]
+ fn build(self) -> Result<Projector, ArrowError> {
+ let reader_fields = self.reader_fields;
+ let mut field_defaults: Vec<Option<AvroLiteral>> =
Vec::with_capacity(reader_fields.len());
+ for avro_field in reader_fields.as_ref() {
+ if let Some(ResolutionInfo::DefaultValue(lit)) =
+ avro_field.data_type().resolution.as_ref()
+ {
+ field_defaults.push(Some(lit.clone()));
+ } else {
+ field_defaults.push(None);
+ }
+ }
+ let mut default_injections: Vec<(usize, AvroLiteral)> =
+ Vec::with_capacity(self.rec.default_fields.len());
+ for &idx in self.rec.default_fields.as_ref() {
+ let lit = field_defaults
+ .get(idx)
+ .and_then(|lit| lit.clone())
+ .unwrap_or(AvroLiteral::Null);
+ default_injections.push((idx, lit));
+ }
+ let mut skip_decoders: Vec<Option<Skipper>> =
+ Vec::with_capacity(self.rec.skip_fields.len());
+ for datatype in self.rec.skip_fields.as_ref() {
+ let skipper = match datatype {
+ Some(datatype) => Some(Skipper::from_avro(datatype)?),
+ None => None,
+ };
+ skip_decoders.push(skipper);
+ }
+ Ok(Projector {
+ writer_to_reader: self.rec.writer_to_reader.clone(),
+ skip_decoders,
+ field_defaults,
+ default_injections: default_injections.into(),
+ })
+ }
+}
+
+impl Projector {
+ #[inline]
+ fn project_default(&self, decoder: &mut Decoder, index: usize) ->
Result<(), ArrowError> {
+ // SAFETY: `index` is obtained by listing the reader's record fields
(i.e., from
+ // `decoders.iter_mut().enumerate()`), and `field_defaults` was built
in
+ // `ProjectorBuilder::build` to have exactly one element per reader
field.
+ // Therefore, `index < self.field_defaults.len()` always holds here, so
+ // `self.field_defaults[index]` cannot panic. We only take an
immutable reference
+ // via `.as_ref()`, and `self` is borrowed immutably.
+ if let Some(default_literal) = self.field_defaults[index].as_ref() {
+ decoder.append_default(default_literal)
+ } else {
+ decoder.append_null();
+ Ok(())
+ }
+ }
+
+ #[inline]
+ fn project_record(
+ &mut self,
+ buf: &mut AvroCursor<'_>,
+ encodings: &mut [Decoder],
+ ) -> Result<(), ArrowError> {
+ debug_assert_eq!(
+ self.writer_to_reader.len(),
+ self.skip_decoders.len(),
+ "internal invariant: mapping and skipper lists must have equal
length"
+ );
+ for (i, (mapping, skipper_opt)) in self
+ .writer_to_reader
+ .iter()
+ .zip(self.skip_decoders.iter_mut())
+ .enumerate()
+ {
+ match (mapping, skipper_opt.as_mut()) {
+ (Some(reader_index), _) =>
encodings[*reader_index].decode(buf)?,
+ (None, Some(skipper)) => skipper.skip(buf)?,
+ (None, None) => {
+ return Err(ArrowError::SchemaError(format!(
+ "No skipper available for writer-only field at index
{i}",
+ )));
+ }
+ }
+ }
+ for (reader_index, lit) in self.default_injections.as_ref() {
+ encodings[*reader_index].append_default(lit)?;
+ }
+ Ok(())
+ }
+}
+
/// Lightweight skipper for non‑projected writer fields
/// (fields present in the writer schema but omitted by the reader/projection);
/// per Avro 1.11.1 schema resolution these fields are ignored.
@@ -1126,25 +1430,13 @@ impl Skipper {
}
}
-#[inline]
-fn build_skip_decoders(
- skip_fields: &[Option<AvroDataType>],
-) -> Result<Vec<Option<Skipper>>, ArrowError> {
- skip_fields
- .iter()
- .map(|opt| opt.as_ref().map(Skipper::from_avro).transpose())
- .collect()
-}
-
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::AvroField;
- use arrow_array::{
- cast::AsArray, Array, Decimal128Array, Decimal256Array,
Decimal32Array, DictionaryArray,
- FixedSizeBinaryArray, IntervalMonthDayNanoArray, ListArray, MapArray,
StringArray,
- StructArray,
- };
+ use crate::schema::{PrimitiveType, Schema, TypeName};
+ use arrow_array::cast::AsArray;
+ use indexmap::IndexMap;
fn encode_avro_int(value: i32) -> Vec<u8> {
let mut buf = Vec::new();
@@ -1977,12 +2269,14 @@ mod tests {
vec!["B".to_string(), "C".to_string(), "A".to_string()].into();
let mapping: Arc<[i32]> = Arc::from(vec![2, 0, 1]);
let default_index: i32 = -1;
- let mut dec = Decoder::EnumResolved {
- indices: Vec::with_capacity(DEFAULT_CAPACITY),
- symbols: reader_symbols.clone(),
- mapping,
- default_index,
- };
+ let mut dec = Decoder::Enum(
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ reader_symbols.clone(),
+ Some(EnumResolution {
+ mapping,
+ default_index,
+ }),
+ );
let mut data = Vec::new();
data.extend_from_slice(&encode_avro_int(0));
data.extend_from_slice(&encode_avro_int(1));
@@ -2013,12 +2307,14 @@ mod tests {
let reader_symbols: Arc<[String]> = vec!["A".to_string(),
"B".to_string()].into();
let default_index: i32 = 1;
let mapping: Arc<[i32]> = Arc::from(vec![0, 1]);
- let mut dec = Decoder::EnumResolved {
- indices: Vec::with_capacity(DEFAULT_CAPACITY),
- symbols: reader_symbols.clone(),
- mapping,
- default_index,
- };
+ let mut dec = Decoder::Enum(
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ reader_symbols.clone(),
+ Some(EnumResolution {
+ mapping,
+ default_index,
+ }),
+ );
let mut data = Vec::new();
data.extend_from_slice(&encode_avro_int(0));
data.extend_from_slice(&encode_avro_int(1));
@@ -2048,12 +2344,14 @@ mod tests {
let reader_symbols: Arc<[String]> = vec!["A".to_string()].into();
let default_index: i32 = -1; // indicates no default at type-level
let mapping: Arc<[i32]> = Arc::from(vec![-1]);
- let mut dec = Decoder::EnumResolved {
- indices: Vec::with_capacity(DEFAULT_CAPACITY),
- symbols: reader_symbols,
- mapping,
- default_index,
- };
+ let mut dec = Decoder::Enum(
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ reader_symbols,
+ Some(EnumResolution {
+ mapping,
+ default_index,
+ }),
+ );
let data = encode_avro_int(0);
let mut cur = AvroCursor::new(&data);
let err = dec
@@ -2069,7 +2367,7 @@ mod tests {
fn make_record_resolved_decoder(
reader_fields: &[(&str, DataType, bool)],
writer_to_reader: Vec<Option<usize>>,
- mut skip_decoders: Vec<Option<super::Skipper>>,
+ skip_decoders: Vec<Option<Skipper>>,
) -> Decoder {
let mut field_refs: Vec<FieldRef> =
Vec::with_capacity(reader_fields.len());
let mut encodings: Vec<Decoder> =
Vec::with_capacity(reader_fields.len());
@@ -2086,12 +2384,16 @@ mod tests {
encodings.push(enc);
}
let fields: Fields = field_refs.into();
- Decoder::RecordResolved {
+ Decoder::Record(
fields,
encodings,
- writer_to_reader: Arc::from(writer_to_reader),
- skip_decoders,
- }
+ Some(Projector {
+ writer_to_reader: Arc::from(writer_to_reader),
+ skip_decoders,
+ field_defaults: vec![None; reader_fields.len()],
+ default_injections: Arc::from(Vec::<(usize,
AvroLiteral)>::new()),
+ }),
+ )
}
#[test]
@@ -2257,4 +2559,445 @@ mod tests {
assert_eq!(id.value(0), 5);
assert_eq!(id.value(1), 7);
}
+
+ fn make_record_decoder_with_projector_defaults(
+ reader_fields: &[(&str, DataType, bool)],
+ field_defaults: Vec<Option<AvroLiteral>>,
+ default_injections: Vec<(usize, AvroLiteral)>,
+ writer_to_reader_len: usize,
+ ) -> Decoder {
+ assert_eq!(
+ field_defaults.len(),
+ reader_fields.len(),
+ "field_defaults must have one entry per reader field"
+ );
+ let mut field_refs: Vec<FieldRef> =
Vec::with_capacity(reader_fields.len());
+ let mut encodings: Vec<Decoder> =
Vec::with_capacity(reader_fields.len());
+ for (name, dt, nullable) in reader_fields {
+ field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(),
*nullable)));
+ let enc = match dt {
+ DataType::Int32 =>
Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY)),
+ DataType::Int64 =>
Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY)),
+ DataType::Utf8 => Decoder::String(
+ OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ ),
+ other => panic!("Unsupported test field type in helper:
{other:?}"),
+ };
+ encodings.push(enc);
+ }
+ let fields: Fields = field_refs.into();
+ let skip_decoders: Vec<Option<Skipper>> =
+ (0..writer_to_reader_len).map(|_| None::<Skipper>).collect();
+ let projector = Projector {
+ writer_to_reader: Arc::from(vec![None; writer_to_reader_len]),
+ skip_decoders,
+ field_defaults,
+ default_injections: Arc::from(default_injections),
+ };
+ Decoder::Record(fields, encodings, Some(projector))
+ }
+
+ #[test]
+ fn test_default_append_int32_and_int64_from_int_and_long() {
+ let mut d_i32 = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
+ d_i32.append_default(&AvroLiteral::Int(42)).unwrap();
+ let arr = d_i32.flush(None).unwrap();
+ let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(a.len(), 1);
+ assert_eq!(a.value(0), 42);
+ let mut d_i64 = Decoder::Int64(Vec::with_capacity(DEFAULT_CAPACITY));
+ d_i64.append_default(&AvroLiteral::Int(5)).unwrap();
+ d_i64.append_default(&AvroLiteral::Long(7)).unwrap();
+ let arr64 = d_i64.flush(None).unwrap();
+ let a64 = arr64.as_any().downcast_ref::<Int64Array>().unwrap();
+ assert_eq!(a64.len(), 2);
+ assert_eq!(a64.value(0), 5);
+ assert_eq!(a64.value(1), 7);
+ }
+
+ #[test]
+ fn test_default_append_floats_and_doubles() {
+ let mut d_f32 = Decoder::Float32(Vec::with_capacity(DEFAULT_CAPACITY));
+ d_f32.append_default(&AvroLiteral::Float(1.5)).unwrap();
+ let arr32 = d_f32.flush(None).unwrap();
+ let a = arr32.as_any().downcast_ref::<Float32Array>().unwrap();
+ assert_eq!(a.value(0), 1.5);
+ let mut d_f64 = Decoder::Float64(Vec::with_capacity(DEFAULT_CAPACITY));
+ d_f64.append_default(&AvroLiteral::Double(2.25)).unwrap();
+ let arr64 = d_f64.flush(None).unwrap();
+ let b = arr64.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(b.value(0), 2.25);
+ }
+
+ #[test]
+ fn test_default_append_string_and_bytes() {
+ let mut d_str = Decoder::String(
+ OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ );
+ d_str
+ .append_default(&AvroLiteral::String("hi".into()))
+ .unwrap();
+ let s_arr = d_str.flush(None).unwrap();
+ let arr = s_arr.as_any().downcast_ref::<StringArray>().unwrap();
+ assert_eq!(arr.value(0), "hi");
+ let mut d_bytes = Decoder::Binary(
+ OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ );
+ d_bytes
+ .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
+ .unwrap();
+ let b_arr = d_bytes.flush(None).unwrap();
+ let barr = b_arr.as_any().downcast_ref::<BinaryArray>().unwrap();
+ assert_eq!(barr.value(0), &[1, 2, 3]);
+ let mut d_str_err = Decoder::String(
+ OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ );
+ let err = d_str_err
+ .append_default(&AvroLiteral::Bytes(vec![0x61, 0x62]))
+ .unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Default for string must be string"),
+ "unexpected error: {err:?}"
+ );
+ }
+
+ #[test]
+ fn test_default_append_nullable_int32_null_and_value() {
+ let inner = Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY));
+ let mut dec = Decoder::Nullable(
+ Nullability::NullFirst,
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(inner),
+ );
+ dec.append_default(&AvroLiteral::Null).unwrap();
+ dec.append_default(&AvroLiteral::Int(11)).unwrap();
+ let arr = dec.flush(None).unwrap();
+ let a = arr.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(a.len(), 2);
+ assert!(a.is_null(0));
+ assert_eq!(a.value(1), 11);
+ }
+
+ #[test]
+ fn test_default_append_array_of_ints() {
+ let list_dt =
avro_from_codec(Codec::List(Arc::new(avro_from_codec(Codec::Int32))));
+ let mut d = Decoder::try_new(&list_dt).unwrap();
+ let items = vec![
+ AvroLiteral::Int(1),
+ AvroLiteral::Int(2),
+ AvroLiteral::Int(3),
+ ];
+ d.append_default(&AvroLiteral::Array(items)).unwrap();
+ let arr = d.flush(None).unwrap();
+ let list = arr.as_any().downcast_ref::<ListArray>().unwrap();
+ assert_eq!(list.len(), 1);
+ assert_eq!(list.value_length(0), 3);
+ let vals =
list.values().as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(vals.values(), &[1, 2, 3]);
+ }
+
+ #[test]
+ fn test_default_append_map_string_to_int() {
+ let map_dt =
avro_from_codec(Codec::Map(Arc::new(avro_from_codec(Codec::Int32))));
+ let mut d = Decoder::try_new(&map_dt).unwrap();
+ let mut m: IndexMap<String, AvroLiteral> = IndexMap::new();
+ m.insert("k1".to_string(), AvroLiteral::Int(10));
+ m.insert("k2".to_string(), AvroLiteral::Int(20));
+ d.append_default(&AvroLiteral::Map(m)).unwrap();
+ let arr = d.flush(None).unwrap();
+ let map = arr.as_any().downcast_ref::<MapArray>().unwrap();
+ assert_eq!(map.len(), 1);
+ assert_eq!(map.value_length(0), 2);
+ let binding = map.value(0);
+ let entries = binding.as_any().downcast_ref::<StructArray>().unwrap();
+ let k = entries
+ .column_by_name("key")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ let v = entries
+ .column_by_name("value")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let keys: std::collections::HashSet<&str> = (0..k.len()).map(|i|
k.value(i)).collect();
+ assert_eq!(keys, ["k1", "k2"].into_iter().collect());
+ let vals: std::collections::HashSet<i32> = (0..v.len()).map(|i|
v.value(i)).collect();
+ assert_eq!(vals, [10, 20].into_iter().collect());
+ }
+
+ #[test]
+ fn test_default_append_enum_by_symbol() {
+ let symbols: Arc<[String]> = vec!["A".into(), "B".into(),
"C".into()].into();
+ let mut d = Decoder::Enum(Vec::with_capacity(DEFAULT_CAPACITY),
symbols.clone(), None);
+ d.append_default(&AvroLiteral::Enum("B".into())).unwrap();
+ let arr = d.flush(None).unwrap();
+ let dict = arr
+ .as_any()
+ .downcast_ref::<DictionaryArray<Int32Type>>()
+ .unwrap();
+ assert_eq!(dict.len(), 1);
+ let expected = Int32Array::from(vec![1]);
+ assert_eq!(dict.keys(), &expected);
+ let values = dict
+ .values()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(values.value(1), "B");
+ }
+
+ #[test]
+ fn test_default_append_uuid_and_type_error() {
+ let mut d = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
+ let uuid_str = "123e4567-e89b-12d3-a456-426614174000";
+ d.append_default(&AvroLiteral::String(uuid_str.into()))
+ .unwrap();
+ let arr_ref = d.flush(None).unwrap();
+ let arr = arr_ref
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .unwrap();
+ assert_eq!(arr.value_length(), 16);
+ assert_eq!(arr.len(), 1);
+ let mut d2 = Decoder::Uuid(Vec::with_capacity(DEFAULT_CAPACITY));
+ let err = d2
+ .append_default(&AvroLiteral::Bytes(vec![0u8; 16]))
+ .unwrap_err();
+ assert!(
+ err.to_string().contains("Default for uuid must be string"),
+ "unexpected error: {err:?}"
+ );
+ }
+
+ #[test]
+ fn test_default_append_fixed_and_length_mismatch() {
+ let mut d = Decoder::Fixed(4, Vec::with_capacity(DEFAULT_CAPACITY));
+ d.append_default(&AvroLiteral::Bytes(vec![1, 2, 3, 4]))
+ .unwrap();
+ let arr_ref = d.flush(None).unwrap();
+ let arr = arr_ref
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .unwrap();
+ assert_eq!(arr.value_length(), 4);
+ assert_eq!(arr.value(0), &[1, 2, 3, 4]);
+ let mut d_err = Decoder::Fixed(4,
Vec::with_capacity(DEFAULT_CAPACITY));
+ let err = d_err
+ .append_default(&AvroLiteral::Bytes(vec![1, 2, 3]))
+ .unwrap_err();
+ assert!(
+ err.to_string().contains("Fixed default length"),
+ "unexpected error: {err:?}"
+ );
+ }
+
+ #[test]
+ fn test_default_append_duration_and_length_validation() {
+ let dt = avro_from_codec(Codec::Interval);
+ let mut d = Decoder::try_new(&dt).unwrap();
+ let mut bytes = Vec::with_capacity(12);
+ bytes.extend_from_slice(&1u32.to_le_bytes());
+ bytes.extend_from_slice(&2u32.to_le_bytes());
+ bytes.extend_from_slice(&3u32.to_le_bytes());
+ d.append_default(&AvroLiteral::Bytes(bytes)).unwrap();
+ let arr_ref = d.flush(None).unwrap();
+ let arr = arr_ref
+ .as_any()
+ .downcast_ref::<IntervalMonthDayNanoArray>()
+ .unwrap();
+ assert_eq!(arr.len(), 1);
+ let v = arr.value(0);
+ assert_eq!(v.months, 1);
+ assert_eq!(v.days, 2);
+ assert_eq!(v.nanoseconds, 3_000_000);
+ let mut d_err =
Decoder::try_new(&avro_from_codec(Codec::Interval)).unwrap();
+ let err = d_err
+ .append_default(&AvroLiteral::Bytes(vec![0u8; 11]))
+ .unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Duration default must be exactly 12 bytes"),
+ "unexpected error: {err:?}"
+ );
+ }
+
+ #[test]
+ fn test_default_append_decimal256_from_bytes() {
+ let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
+ let mut d = Decoder::try_new(&dt).unwrap();
+ let pos: [u8; 32] = [
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x30, 0x39,
+ ];
+ d.append_default(&AvroLiteral::Bytes(pos.to_vec())).unwrap();
+ let neg: [u8; 32] = [
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF,
+ 0xFF, 0xFF, 0xFF, 0x85,
+ ];
+ d.append_default(&AvroLiteral::Bytes(neg.to_vec())).unwrap();
+ let arr = d.flush(None).unwrap();
+ let dec = arr.as_any().downcast_ref::<Decimal256Array>().unwrap();
+ assert_eq!(dec.len(), 2);
+ assert_eq!(dec.value_as_string(0), "123.45");
+ assert_eq!(dec.value_as_string(1), "-1.23");
+ }
+
+ #[test]
+ fn
test_record_append_default_map_missing_fields_uses_projector_field_defaults() {
+ let field_defaults = vec![None,
Some(AvroLiteral::String("hi".into()))];
+ let mut rec = make_record_decoder_with_projector_defaults(
+ &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
+ field_defaults,
+ vec![],
+ 0,
+ );
+ let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
+ map.insert("a".to_string(), AvroLiteral::Int(7));
+ rec.append_default(&AvroLiteral::Map(map)).unwrap();
+ let arr = rec.flush(None).unwrap();
+ let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+ let a = s
+ .column_by_name("a")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let b = s
+ .column_by_name("b")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(a.value(0), 7);
+ assert_eq!(b.value(0), "hi");
+ }
+
+ #[test]
+ fn test_record_append_default_null_uses_projector_field_defaults() {
+ let field_defaults = vec![
+ Some(AvroLiteral::Int(5)),
+ Some(AvroLiteral::String("x".into())),
+ ];
+ let mut rec = make_record_decoder_with_projector_defaults(
+ &[("a", DataType::Int32, false), ("b", DataType::Utf8, false)],
+ field_defaults,
+ vec![],
+ 0,
+ );
+ rec.append_default(&AvroLiteral::Null).unwrap();
+ let arr = rec.flush(None).unwrap();
+ let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+ let a = s
+ .column_by_name("a")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let b = s
+ .column_by_name("b")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(a.value(0), 5);
+ assert_eq!(b.value(0), "x");
+ }
+
+ #[test]
+ fn
test_record_append_default_missing_fields_without_projector_defaults_yields_type_nulls_or_empties(
+ ) {
+ let fields = vec![("a", DataType::Int32, true), ("b", DataType::Utf8,
true)];
+ let mut field_refs: Vec<FieldRef> = Vec::new();
+ let mut encoders: Vec<Decoder> = Vec::new();
+ for (name, dt, nullable) in &fields {
+ field_refs.push(Arc::new(ArrowField::new(*name, dt.clone(),
*nullable)));
+ }
+ let enc_a = Decoder::Nullable(
+ Nullability::NullSecond,
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(Decoder::Int32(Vec::with_capacity(DEFAULT_CAPACITY))),
+ );
+ let enc_b = Decoder::Nullable(
+ Nullability::NullSecond,
+ NullBufferBuilder::new(DEFAULT_CAPACITY),
+ Box::new(Decoder::String(
+ OffsetBufferBuilder::new(DEFAULT_CAPACITY),
+ Vec::with_capacity(DEFAULT_CAPACITY),
+ )),
+ );
+ encoders.push(enc_a);
+ encoders.push(enc_b);
+ let projector = Projector {
+ writer_to_reader: Arc::from(vec![]),
+ skip_decoders: vec![],
+ field_defaults: vec![None, None], // no defaults -> append_null
+ default_injections: Arc::from(Vec::<(usize, AvroLiteral)>::new()),
+ };
+ let mut rec = Decoder::Record(field_refs.into(), encoders,
Some(projector));
+ let mut map: IndexMap<String, AvroLiteral> = IndexMap::new();
+ map.insert("a".to_string(), AvroLiteral::Int(9));
+ rec.append_default(&AvroLiteral::Map(map)).unwrap();
+ let arr = rec.flush(None).unwrap();
+ let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+ let a = s
+ .column_by_name("a")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let b = s
+ .column_by_name("b")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert!(a.is_valid(0));
+ assert_eq!(a.value(0), 9);
+ assert!(b.is_null(0));
+ }
+
+ #[test]
+ fn test_projector_default_injection_when_writer_lacks_fields() {
+ let defaults = vec![None, None];
+ let injections = vec![
+ (0, AvroLiteral::Int(99)),
+ (1, AvroLiteral::String("alice".into())),
+ ];
+ let mut rec = make_record_decoder_with_projector_defaults(
+ &[
+ ("id", DataType::Int32, false),
+ ("name", DataType::Utf8, false),
+ ],
+ defaults,
+ injections,
+ 0,
+ );
+ rec.decode(&mut AvroCursor::new(&[])).unwrap();
+ let arr = rec.flush(None).unwrap();
+ let s = arr.as_any().downcast_ref::<StructArray>().unwrap();
+ let id = s
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ let name = s
+ .column_by_name("name")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(id.value(0), 99);
+ assert_eq!(name.value(0), "alice");
+ }
}