jecsand838 commented on code in PR #8298:
URL: https://github.com/apache/arrow-rs/pull/8298#discussion_r2338265842
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -69,6 +73,77 @@ fn write_bool<W: Write + ?Sized>(out: &mut W, v: bool) ->
Result<(), ArrowError>
.map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
}
+/// Minimal two's-complement big-endian representation helper for Avro decimal
(bytes).
+///
+/// For positive numbers, trim leading 0x00 while the next byte's MSB is 0.
+/// For negative numbers, trim leading 0xFF while the next byte's MSB is 1.
+/// The resulting slice still encodes the same signed value.
+///
+/// See Avro spec: decimal over `bytes` uses two's-complement big-endian
+/// representation of the unscaled integer value. 1.11.1 specification.
+#[inline]
+fn minimal_twos_complement(be: &[u8]) -> &[u8] {
+ if be.is_empty() {
+ return be;
+ }
+ let mut i = 0usize;
+ let sign = (be[0] & 0x80) != 0;
+ while i + 1 < be.len() {
+ let b = be[i];
+ let next = be[i + 1];
+ let trim_pos = !sign && b == 0x00 && (next & 0x80) == 0;
+ let trim_neg = sign && b == 0xFF && (next & 0x80) != 0;
+ if trim_pos || trim_neg {
+ i += 1;
+ } else {
+ break;
+ }
+ }
+ &be[i..]
+}
+
+/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly `n`
bytes.
+///
+/// If `src_be` is longer than `n`, ensure that dropped leading bytes are all
sign bytes,
+/// and that the MSB of the first kept byte matches the sign; otherwise return
an overflow error.
+/// If shorter than `n`, left-pad with the sign byte.
+///
+/// Used for Avro decimal over `fixed(N)`.
+#[inline]
+fn sign_extend_to_exact(src_be: &[u8], n: usize) -> Result<Vec<u8>,
ArrowError> {
+ let len = src_be.len();
+ let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
+ 0xFF
+ } else {
+ 0x00
+ };
+ if len == n {
+ return Ok(src_be.to_vec());
+ }
+ if len > n {
+ let extra = len - n;
+ if src_be[..extra].iter().any(|&b| b != sign_byte) {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Decimal value with {} bytes cannot be represented in {} bytes
without overflow",
+ len, n
+ )));
+ }
+ if n > 0 {
+ let first_kept = src_be[extra];
+ if ((first_kept ^ sign_byte) & 0x80) != 0 {
+ return Err(ArrowError::InvalidArgumentError(format!(
+ "Decimal value with {} bytes cannot be represented in {}
bytes without overflow",
+ len, n
+ )));
+ }
+ }
+ return Ok(src_be[extra..].to_vec());
+ }
+ let mut out = vec![sign_byte; n];
+ out[n - len..].copy_from_slice(src_be);
+ Ok(out)
+}
Review Comment:
I'd recommend an approach like this for `minimal_twos_complement` and
`sign_extend_to_exact`:
```suggestion
#[inline]
fn minimal_twos_complement(be: &[u8]) -> &[u8] {
if be.is_empty() {
return be;
}
let mut i = 0usize;
let sign_bit_set = (be[0] & 0x80) != 0;
let sign_byte = if sign_bit_set { 0xFF } else { 0x00 };
while i + 1 < be.len() {
if be[i] == sign_byte && (((be[i + 1] & 0x80) != 0) == sign_bit_set)
{
i += 1;
} else {
break;
}
}
&be[i..]
}
/// Sign-extend (or validate/truncate) big-endian integer bytes to exactly
`n` bytes.
///
/// If `src_be` is longer than `n`, ensure that dropped leading bytes are
all sign bytes,
/// and that the MSB of the first kept byte matches the sign; otherwise
return an overflow error.
/// If shorter than `n`, left-pad with the sign byte.
///
/// Used for Avro decimal over `fixed(N)`.
#[inline]
fn sign_extend_to_exact<'a>(
src_be: &'_ [u8],
n: usize,
) -> Result<std::borrow::Cow<'_, [u8]>, ArrowError> {
let len = src_be.len();
let sign_byte = if len > 0 && (src_be[0] & 0x80) != 0 {
0xFF
} else {
0x00
};
if len == n {
return Ok(std::borrow::Cow::Borrowed(src_be));
}
if len > n {
let extra = len - n;
if src_be[..extra].iter().any(|&b| b != sign_byte) {
return Err(ArrowError::InvalidArgumentError(format!(
"Decimal value with {len} bytes cannot be represented in {n}
bytes without overflow",
)));
}
if n > 0 {
let first_kept = src_be[extra];
if ((first_kept ^ sign_byte) & 0x80) != 0 {
return Err(ArrowError::InvalidArgumentError(format!(
"Decimal value with {len} bytes cannot be represented in
{n} bytes without overflow",
)));
}
}
return Ok(std::borrow::Cow::Borrowed(&src_be[extra..]));
}
let pad_len = n - len;
let mut out = Vec::with_capacity(n);
out.resize(pad_len, sign_byte);
out.extend_from_slice(src_be);
Ok(std::borrow::Cow::Owned(out))
}
```
Basically in `sign_extend_to_exact` we can use `Cow::Borrowed` to borrow the
original bytes in the common cases (avoiding heap allocs and copies). Then when
`len < n` we can use padding to write once (avoiding overwrites).
Meanwhile in the `minimal_twos_complement` method, I managed to compile the
`sign_byte` once and check the redundant sign byte condition using only a
single composite predicate.
Putting the optimizations together, I got some pretty impressive performance
improvements:
```
Running benches/avro_writer.rs
(target/release/deps/avro_writer-7488faba55dfea49)
write-Decimal128(bytes)/4096
time: [45.789 µs 46.102 µs 46.470 µs]
thrpt: [1008.4 MiB/s 1016.5 MiB/s 1023.4 MiB/s]
change:
time: [−21.015% −20.118% −19.148%] (p = 0.00 <
0.05)
thrpt: [+23.682% +25.185% +26.606%]
Performance has improved.
Found 1 outliers among 40 measurements (2.50%)
1 (2.50%) high mild
write-Decimal128(bytes)/8192
time: [83.798 µs 84.563 µs 85.315 µs]
thrpt: [1.0697 GiB/s 1.0792 GiB/s 1.0890 GiB/s]
change:
time: [−25.039% −23.785% −22.585%] (p = 0.00 <
0.05)
thrpt: [+29.174% +31.208% +33.403%]
Performance has improved.
write-Decimal128(bytes)/100000
time: [896.85 µs 901.37 µs 905.65 µs]
thrpt: [1.2280 GiB/s 1.2338 GiB/s 1.2400 GiB/s]
change:
time: [−29.067% −28.448% −27.701%] (p = 0.00 <
0.05)
thrpt: [+38.314% +39.758% +40.978%]
Performance has improved.
Found 1 outliers among 20 measurements (5.00%)
1 (5.00%) high severe
write-Decimal128(bytes)/1000000
time: [10.287 ms 10.387 ms 10.485 ms]
thrpt: [1.0605 GiB/s 1.0706 GiB/s 1.0809 GiB/s]
change:
time: [−28.151% −27.540% −26.881%] (p = 0.00 <
0.05)
thrpt: [+36.763% +38.007% +39.181%]
Performance has improved.
write-Decimal128(fixed16)/4096
time: [44.414 µs 44.706 µs 45.069 µs]
thrpt: [1.3595 GiB/s 1.3705 GiB/s 1.3795 GiB/s]
change:
time: [−54.217% −53.750% −53.309%] (p = 0.00 <
0.05)
thrpt: [+114.18% +116.22% +118.42%]
Performance has improved.
write-Decimal128(fixed16)/8192
time: [91.220 µs 91.851 µs 92.609 µs]
thrpt: [1.3207 GiB/s 1.3316 GiB/s 1.3408 GiB/s]
change:
time: [−53.206% −52.752% −52.260%] (p = 0.00 <
0.05)
thrpt: [+109.47% +111.65% +113.70%]
Performance has improved.
Found 2 outliers among 40 measurements (5.00%)
2 (5.00%) high mild
write-Decimal128(fixed16)/100000
time: [844.22 µs 849.13 µs 853.75 µs]
thrpt: [1.7457 GiB/s 1.7552 GiB/s 1.7654 GiB/s]
change:
time: [−62.031% −60.476% −59.490%] (p = 0.00 <
0.05)
thrpt: [+146.85% +153.01% +163.38%]
Performance has improved.
Found 2 outliers among 20 measurements (10.00%)
1 (5.00%) low mild
1 (5.00%) high severe
write-Decimal128(fixed16)/1000000
time: [9.8248 ms 9.8740 ms 9.9041 ms]
thrpt: [1.5046 GiB/s 1.5092 GiB/s 1.5167 GiB/s]
change:
time: [−56.736% −56.412% −56.075%] (p = 0.00 <
0.05)
thrpt: [+127.66% +129.42% +131.14%]
Performance has improved.
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -653,6 +893,182 @@ fn prepare_value_site_encoder<'a>(
FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
}
+/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
+/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
+struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
+impl FixedEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let v = self.0.value(idx); // &[u8] of fixed width
+ out.write_all(v)
+ .map_err(|e| ArrowError::IoError(format!("write fixed bytes:
{e}"), e))
+ }
+}
+
+/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string
(UUID).
+/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated
form.
+struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
+impl UuidEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let v = self.0.value(idx);
+ if v.len() != 16 {
+ return Err(ArrowError::InvalidArgumentError(
+ "logicalType=uuid requires FixedSizeBinary(16)".into(),
+ ));
+ }
+ let u = Uuid::from_slice(v)
+ .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid
UUID bytes: {e}")))?;
+ let mut tmp = [0u8; uuid::fmt::Hyphenated::LENGTH];
+ let s = u.hyphenated().encode_lower(&mut tmp);
+ write_len_prefixed(out, s.as_bytes())
+ }
+}
Review Comment:
We should be able to optimize this by:
1. Adding an `Uuid` variant to the `FieldPlan` enum,
2. Pushing the length check on line 913 back into either the
`FieldEncoder::make_encoder` and/or `FieldPlan::build` methods and;
3. Updating the `UuidEncoder:: encode ` method to format a hyphenated-lower
UUID and write it as a string in a single write.
I managed to get a ~25% performance increase in throughput from the version
below.
```suggestion
struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
impl UuidEncoder<'_> {
fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
debug_assert_eq!(self.0.value_length(), 16);
let mut buf = [0u8; 1 + uuid::fmt::Hyphenated::LENGTH];
buf[0] = 0x48;
let v = self.0.value(idx);
let u = Uuid::from_slice(v)
.map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid
UUID bytes: {e}")))?;
let _ = u.hyphenated().encode_lower(&mut buf[1..]);
out.write_all(&buf)
.map_err(|e| ArrowError::IoError(format!("write uuid: {e}"), e))
}
}
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -763,4 +1179,100 @@ mod tests {
let got = encode_all(&arr, &FieldPlan::Scalar, None);
assert_bytes_eq(&got, &expected);
}
+
+ #[test]
+ fn list_encoder_int32() {
+ // Build ListArray [[1,2], [], [3]]
+ let values = Int32Array::from(vec![1, 2, 3]);
+ let offsets = vec![0, 2, 2, 3];
+ let list = ListArray::new(
+ Field::new("item", DataType::Int32, true).into(),
+ arrow_buffer::OffsetBuffer::new(offsets.into()),
+ Arc::new(values) as ArrayRef,
+ None,
+ );
+ // Avro array encoding per row
+ let mut expected = Vec::new();
+ // row 0: block len 2, items 1,2 then 0
+ expected.extend(avro_long_bytes(2));
+ expected.extend(avro_long_bytes(1));
+ expected.extend(avro_long_bytes(2));
+ expected.extend(avro_long_bytes(0));
+ // row 1: empty
+ expected.extend(avro_long_bytes(0));
+ // row 2: one item 3
+ expected.extend(avro_long_bytes(1));
+ expected.extend(avro_long_bytes(3));
+ expected.extend(avro_long_bytes(0));
+
+ let plan = FieldPlan::List {
+ items_nullability: None,
+ item_plan: Box::new(FieldPlan::Scalar),
+ };
+ let got = encode_all(&list, &plan, None);
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn struct_encoder_two_fields() {
+ // Struct { a: Int32, b: Utf8 }
+ let a = Int32Array::from(vec![1, 2]);
+ let b = StringArray::from(vec!["x", "y"]);
+ let fields = Fields::from(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Utf8, true),
+ ]);
+ let struct_arr = StructArray::new(
+ fields.clone(),
+ vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
+ None,
+ );
+ let plan = FieldPlan::Struct {
+ encoders: vec![
+ FieldBinding {
+ arrow_index: 0,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ FieldBinding {
+ arrow_index: 1,
+ nullability: None,
+ plan: FieldPlan::Scalar,
+ },
+ ],
+ };
+ let got = encode_all(&struct_arr, &plan, None);
+ // Expected: rows concatenated: a then b
+ let mut expected = Vec::new();
+ expected.extend(avro_long_bytes(1)); // a=1
+ expected.extend(avro_len_prefixed_bytes(b"x")); // b="x"
+ expected.extend(avro_long_bytes(2)); // a=2
+ expected.extend(avro_len_prefixed_bytes(b"y")); // b="y"
+ assert_bytes_eq(&got, &expected);
+ }
+
+ #[test]
+ fn decimal_bytes_and_fixed() {
+ // Use Decimal128 with small positives and negatives
+ let dec = Decimal128Array::from(vec![1i128, -1i128, 0i128])
+ .with_precision_and_scale(20, 0)
+ .unwrap();
+ // bytes(decimal): minimal two's complement length-prefixed
+ let plan_bytes = FieldPlan::Decimal { size: None };
+ let got_bytes = encode_all(&dec, &plan_bytes, None);
+ // 1 -> 0x01; -1 -> 0xFF; 0 -> 0x00
+ let mut expected_bytes = Vec::new();
+ expected_bytes.extend(avro_len_prefixed_bytes(&[0x01]));
+ expected_bytes.extend(avro_len_prefixed_bytes(&[0xFF]));
+ expected_bytes.extend(avro_len_prefixed_bytes(&[0x00]));
+ assert_bytes_eq(&got_bytes, &expected_bytes);
+
+ let plan_fixed = FieldPlan::Decimal { size: Some(16) };
+ let got_fixed = encode_all(&dec, &plan_fixed, None);
+ let mut expected_fixed = Vec::new();
+ expected_fixed.extend_from_slice(&1i128.to_be_bytes());
+ expected_fixed.extend_from_slice(&(-1i128).to_be_bytes());
+ expected_fixed.extend_from_slice(&0i128.to_be_bytes());
+ assert_bytes_eq(&got_fixed, &expected_fixed);
+ }
}
Review Comment:
To @alamb 's point, I'd definitely add some unit test coverage, like for the
duration tests:
```rust
fn duration_fixed12(months: u32, days: u32, millis: u32) -> [u8; 12] {
let m = months.to_le_bytes();
let d = days.to_le_bytes();
let ms = millis.to_le_bytes();
[
m[0], m[1], m[2], m[3],
d[0], d[1], d[2], d[3],
ms[0], ms[1], ms[2], ms[3],
]
}
#[test]
fn duration_encoder_year_month_happy_path() {
let arr: PrimitiveArray<IntervalYearMonthType> = vec![0i32, 1i32,
25i32].into();
let mut expected = Vec::new();
for m in [0u32, 1u32, 25u32] {
expected.extend_from_slice(&duration_fixed12(m, 0, 0));
}
let got = encode_all(&arr, &FieldPlan::Scalar, None);
assert_bytes_eq(&got, &expected);
}
#[test]
fn duration_encoder_year_month_rejects_negative() {
let arr: PrimitiveArray<IntervalYearMonthType> = vec![-1i32].into();
let field = Field::new(
"f",
DataType::Interval(IntervalUnit::YearMonth),
true,
);
let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
ArrowError::InvalidArgumentError(msg) => {
assert!(msg.contains("cannot encode negative months"))
}
other => panic!("expected InvalidArgumentError, got {other:?}"),
}
}
#[test]
fn duration_encoder_day_time_happy_path() {
let v0 = IntervalDayTimeType::make_value(2, 500); // days=2,
millis=500
let v1 = IntervalDayTimeType::make_value(0, 0);
let arr: PrimitiveArray<IntervalDayTimeType> = vec![v0, v1].into();
let mut expected = Vec::new();
expected.extend_from_slice(&duration_fixed12(0, 2, 500));
expected.extend_from_slice(&duration_fixed12(0, 0, 0));
let got = encode_all(&arr, &FieldPlan::Scalar, None);
assert_bytes_eq(&got, &expected);
}
#[test]
fn duration_encoder_day_time_rejects_negative() {
let bad = IntervalDayTimeType::make_value(-1, 0);
let arr: PrimitiveArray<IntervalDayTimeType> = vec![bad].into();
let field = Field::new(
"f",
DataType::Interval(IntervalUnit::DayTime),
true,
);
let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
ArrowError::InvalidArgumentError(msg) => {
assert!(msg.contains("cannot encode negative days"))
}
other => panic!("expected InvalidArgumentError, got {other:?}"),
}
}
#[test]
fn duration_encoder_month_day_nano_happy_path() {
let v0 = IntervalMonthDayNanoType::make_value(1, 2, 3_000_000); //
-> millis = 3
let v1 = IntervalMonthDayNanoType::make_value(0, 0, 0);
let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![v0,
v1].into();
let mut expected = Vec::new();
expected.extend_from_slice(&duration_fixed12(1, 2, 3));
expected.extend_from_slice(&duration_fixed12(0, 0, 0));
let got = encode_all(&arr, &FieldPlan::Scalar, None);
assert_bytes_eq(&got, &expected);
}
#[test]
fn duration_encoder_month_day_nano_rejects_non_ms_multiple() {
let bad = IntervalMonthDayNanoType::make_value(0, 0, 1);
let arr: PrimitiveArray<IntervalMonthDayNanoType> = vec![bad].into();
let field = Field::new(
"f",
DataType::Interval(IntervalUnit::MonthDayNano),
true,
);
let mut enc = FieldEncoder::make_encoder(&arr, &field,
&FieldPlan::Scalar, None).unwrap();
let mut out = Vec::new();
let err = enc.encode(&mut out, 0).unwrap_err();
match err {
ArrowError::InvalidArgumentError(msg) => {
assert!(msg.contains("requires whole milliseconds") ||
msg.contains("divisible"))
}
other => panic!("expected InvalidArgumentError, got {other:?}"),
}
}
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -171,15 +246,111 @@ impl<'a> FieldEncoder<'a> {
DataType::LargeBinary => {
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
}
+ DataType::FixedSizeBinary(len) => {
+ // Decide between Avro `fixed` (raw bytes) and `uuid`
logical string
+ // based on Field metadata, mirroring schema generation
rules.
+ let arr = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
FixedSizeBinaryArray".into())
+ })?;
+ let md = field.metadata();
+ let is_uuid = md.get("logicalType").is_some_and(|v| v ==
"uuid")
+ || (*len == 16
+ && md.get("ARROW:extension:name").is_some_and(|v| v ==
"uuid"));
+ if is_uuid {
+ if *len != 16 {
+ return Err(ArrowError::InvalidArgumentError(
+ "logicalType=uuid requires
FixedSizeBinary(16)".into(),
+ ));
+ }
+ Encoder::Uuid(UuidEncoder(arr))
+ } else {
+ Encoder::Fixed(FixedEncoder(arr))
+ }
+ }
DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
array.as_primitive::<TimestampMicrosecondType>(),
)),
+ DataType::Interval(unit) => match unit {
+ IntervalUnit::MonthDayNano => {
+
Encoder::IntervalMonthDayNano(IntervalMonthDayNanoEncoder(
+ array.as_primitive::<IntervalMonthDayNanoType>(),
+ ))
+ }
+ IntervalUnit::YearMonth => {
+ Encoder::IntervalYearMonth(IntervalYearMonthEncoder(
+ array.as_primitive::<IntervalYearMonthType>(),
+ ))
+ }
+ IntervalUnit::DayTime =>
Encoder::IntervalDayTime(IntervalDayTimeEncoder(
+ array.as_primitive::<IntervalDayTimeType>(),
+ )),
+ }
+ DataType::Duration(_) => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: Arrow Duration(TimeUnit) has no standard
Avro mapping; cast to Interval(MonthDayNano) to use Avro 'duration'".into(),
+ ));
+ }
+ // Composite or mismatched types under scalar plan
+ DataType::List(_)
+ | DataType::LargeList(_)
+ | DataType::Map(_, _)
+ | DataType::Struct(_)
+ | DataType::Dictionary(_, _)
+ | DataType::Decimal32(_, _)
+ | DataType::Decimal64(_, _)
+ | DataType::Decimal128(_, _)
+ | DataType::Decimal256(_, _) => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro scalar site incompatible with Arrow type: {:?}",
+ array.data_type()
+ )))
+ }
Review Comment:
I'm not sure it's possible to reach these cases due to the logic in the
`FieldPlan::build` method (Other than temporarily `DataType::Map` since there's
not a `FieldPlan` variant for it in this PR).
You could probably just remove this.
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -171,15 +246,111 @@ impl<'a> FieldEncoder<'a> {
DataType::LargeBinary => {
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
}
+ DataType::FixedSizeBinary(len) => {
+ // Decide between Avro `fixed` (raw bytes) and `uuid`
logical string
+ // based on Field metadata, mirroring schema generation
rules.
+ let arr = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
FixedSizeBinaryArray".into())
+ })?;
+ let md = field.metadata();
+ let is_uuid = md.get("logicalType").is_some_and(|v| v ==
"uuid")
+ || (*len == 16
+ && md.get("ARROW:extension:name").is_some_and(|v| v ==
"uuid"));
+ if is_uuid {
+ if *len != 16 {
+ return Err(ArrowError::InvalidArgumentError(
+ "logicalType=uuid requires
FixedSizeBinary(16)".into(),
+ ));
+ }
+ Encoder::Uuid(UuidEncoder(arr))
+ } else {
+ Encoder::Fixed(FixedEncoder(arr))
+ }
+ }
DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
array.as_primitive::<TimestampMicrosecondType>(),
)),
+ DataType::Interval(unit) => match unit {
+ IntervalUnit::MonthDayNano => {
+
Encoder::IntervalMonthDayNano(IntervalMonthDayNanoEncoder(
+ array.as_primitive::<IntervalMonthDayNanoType>(),
+ ))
+ }
+ IntervalUnit::YearMonth => {
+ Encoder::IntervalYearMonth(IntervalYearMonthEncoder(
+ array.as_primitive::<IntervalYearMonthType>(),
+ ))
+ }
+ IntervalUnit::DayTime =>
Encoder::IntervalDayTime(IntervalDayTimeEncoder(
+ array.as_primitive::<IntervalDayTimeType>(),
+ )),
Review Comment:
Then using the `DurationEncoder` (see other comment), you could do:
```suggestion
IntervalUnit::MonthDayNano => {
Encoder::IntervalMonthDayNano(DurationEncoder(
array.as_primitive::<IntervalMonthDayNanoType>(),
))
}
IntervalUnit::YearMonth => {
Encoder::IntervalYearMonth(DurationEncoder(
array.as_primitive::<IntervalYearMonthType>(),
))
}
IntervalUnit::DayTime =>
Encoder::IntervalDayTime(DurationEncoder(
array.as_primitive::<IntervalDayTimeType>(),
)),
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -171,15 +246,111 @@ impl<'a> FieldEncoder<'a> {
DataType::LargeBinary => {
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
}
+ DataType::FixedSizeBinary(len) => {
+ // Decide between Avro `fixed` (raw bytes) and `uuid`
logical string
+ // based on Field metadata, mirroring schema generation
rules.
+ let arr = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
FixedSizeBinaryArray".into())
+ })?;
+ let md = field.metadata();
+ let is_uuid = md.get("logicalType").is_some_and(|v| v ==
"uuid")
+ || (*len == 16
+ && md.get("ARROW:extension:name").is_some_and(|v| v ==
"uuid"));
+ if is_uuid {
Review Comment:
Instead of just relying on a metadata hint, I wonder if it's better to
leverage the `canonical_extension_types` feature flag and canonical Arrow
extension name.
Maybe something like this?
```suggestion
// Extension-based detection (only when the feature is
enabled)
let ext_is_uuid = {
#[cfg(feature = "canonical_extension_types")]
{
matches!(field.extension_type_name(),
Some("arrow.uuid") | Some("uuid"))
}
#[cfg(not(feature = "canonical_extension_types"))]
{
false
}
};
let md_is_uuid =
field.metadata().get("logicalType").map(|s|
s.as_str()) == Some("uuid");
if ext_is_uuid || md_is_uuid {
```
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -171,15 +246,111 @@ impl<'a> FieldEncoder<'a> {
DataType::LargeBinary => {
Encoder::LargeBinary(BinaryEncoder(array.as_binary::<i64>()))
}
+ DataType::FixedSizeBinary(len) => {
+ // Decide between Avro `fixed` (raw bytes) and `uuid`
logical string
+ // based on Field metadata, mirroring schema generation
rules.
+ let arr = array
+ .as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .ok_or_else(|| {
+ ArrowError::SchemaError("Expected
FixedSizeBinaryArray".into())
+ })?;
+ let md = field.metadata();
+ let is_uuid = md.get("logicalType").is_some_and(|v| v ==
"uuid")
+ || (*len == 16
+ && md.get("ARROW:extension:name").is_some_and(|v| v ==
"uuid"));
+ if is_uuid {
+ if *len != 16 {
+ return Err(ArrowError::InvalidArgumentError(
+ "logicalType=uuid requires
FixedSizeBinary(16)".into(),
+ ));
+ }
+ Encoder::Uuid(UuidEncoder(arr))
+ } else {
+ Encoder::Fixed(FixedEncoder(arr))
+ }
+ }
DataType::Timestamp(TimeUnit::Microsecond, _) =>
Encoder::Timestamp(LongEncoder(
array.as_primitive::<TimestampMicrosecondType>(),
)),
+ DataType::Interval(unit) => match unit {
+ IntervalUnit::MonthDayNano => {
+
Encoder::IntervalMonthDayNano(IntervalMonthDayNanoEncoder(
+ array.as_primitive::<IntervalMonthDayNanoType>(),
+ ))
+ }
+ IntervalUnit::YearMonth => {
+ Encoder::IntervalYearMonth(IntervalYearMonthEncoder(
+ array.as_primitive::<IntervalYearMonthType>(),
+ ))
+ }
+ IntervalUnit::DayTime =>
Encoder::IntervalDayTime(IntervalDayTimeEncoder(
+ array.as_primitive::<IntervalDayTimeType>(),
+ )),
+ }
+ DataType::Duration(_) => {
+ return Err(ArrowError::NotYetImplemented(
+ "Avro writer: Arrow Duration(TimeUnit) has no standard
Avro mapping; cast to Interval(MonthDayNano) to use Avro 'duration'".into(),
+ ));
+ }
+ // Composite or mismatched types under scalar plan
+ DataType::List(_)
+ | DataType::LargeList(_)
+ | DataType::Map(_, _)
+ | DataType::Struct(_)
+ | DataType::Dictionary(_, _)
+ | DataType::Decimal32(_, _)
+ | DataType::Decimal64(_, _)
+ | DataType::Decimal128(_, _)
+ | DataType::Decimal256(_, _) => {
+ return Err(ArrowError::SchemaError(format!(
+ "Avro scalar site incompatible with Arrow type: {:?}",
+ array.data_type()
+ )))
+ }
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Avro scalar type not yet supported: {other:?}"
)));
}
},
+ FieldPlan::Decimal {size} => match array.data_type() {
Review Comment:
Nit, but I'd move the `FieldPlan::Decimal` branch (and non `Scalar` branches
in general) up. Having the `FieldPlan::Scalar` branch last (or first) just
makes the code easier to follow, at least for me.
##########
arrow-avro/src/writer/encoder.rs:
##########
@@ -653,6 +893,182 @@ fn prepare_value_site_encoder<'a>(
FieldEncoder::make_encoder(values_array, value_field, plan, nullability)
}
+/// Avro `fixed` encoder for Arrow `FixedSizeBinaryArray`.
+/// Spec: a fixed is encoded as exactly `size` bytes, with no length prefix.
+struct FixedEncoder<'a>(&'a FixedSizeBinaryArray);
+impl FixedEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let v = self.0.value(idx); // &[u8] of fixed width
+ out.write_all(v)
+ .map_err(|e| ArrowError::IoError(format!("write fixed bytes:
{e}"), e))
+ }
+}
+
+/// Avro UUID logical type encoder: Arrow FixedSizeBinary(16) → Avro string
(UUID).
+/// Spec: uuid is a logical type over string (RFC‑4122). We output hyphenated
form.
+struct UuidEncoder<'a>(&'a FixedSizeBinaryArray);
+impl UuidEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let v = self.0.value(idx);
+ if v.len() != 16 {
+ return Err(ArrowError::InvalidArgumentError(
+ "logicalType=uuid requires FixedSizeBinary(16)".into(),
+ ));
+ }
+ let u = Uuid::from_slice(v)
+ .map_err(|e| ArrowError::InvalidArgumentError(format!("Invalid
UUID bytes: {e}")))?;
+ let mut tmp = [0u8; uuid::fmt::Hyphenated::LENGTH];
+ let s = u.hyphenated().encode_lower(&mut tmp);
+ write_len_prefixed(out, s.as_bytes())
+ }
+}
+
+/// Avro `duration` encoder for Arrow `Interval(IntervalUnit::MonthDayNano)`.
+/// Spec: `duration` annotates Avro fixed(12) with three **little‑endian u32**:
+/// months, days, milliseconds (no negatives).
+struct IntervalMonthDayNanoEncoder<'a>(&'a
PrimitiveArray<IntervalMonthDayNanoType>);
+impl IntervalMonthDayNanoEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let native = self.0.value(idx);
+ let (months, days, nanos) = IntervalMonthDayNanoType::to_parts(native);
+ if months < 0 || days < 0 || nanos < 0 {
+ return Err(ArrowError::InvalidArgumentError(
+ "Avro 'duration' cannot encode negative
months/days/nanoseconds".into(),
+ ));
+ }
+ if nanos % 1_000_000 != 0 {
+ return Err(ArrowError::InvalidArgumentError(
+ "Avro 'duration' requires whole milliseconds; nanoseconds must
be divisible by 1_000_000"
+ .into(),
+ ));
+ }
+ let millis = nanos / 1_000_000;
+ if millis > u32::MAX as i64 {
+ return Err(ArrowError::InvalidArgumentError(
+ "Avro 'duration' milliseconds exceed u32::MAX".into(),
+ ));
+ }
+ let mut buf = [0u8; 12];
+ buf[0..4].copy_from_slice(&(months as u32).to_le_bytes());
+ buf[4..8].copy_from_slice(&(days as u32).to_le_bytes());
+ buf[8..12].copy_from_slice(&(millis as u32).to_le_bytes());
+ out.write_all(&buf)
+ .map_err(|e| ArrowError::IoError(format!("write duration: {e}"),
e))
+ }
+}
+
+/// Avro `duration` encoder for Arrow `Interval(IntervalUnit::YearMonth)`.
+struct IntervalYearMonthEncoder<'a>(&'a PrimitiveArray<IntervalYearMonthType>);
+impl IntervalYearMonthEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ let months_i32 = self.0.value(idx);
+
+ if months_i32 < 0 {
+ return Err(ArrowError::InvalidArgumentError(
+ "Avro 'duration' cannot encode negative months".into(),
+ ));
+ }
+
+ let mut buf = [0u8; 12];
+ buf[0..4].copy_from_slice(&(months_i32 as u32).to_le_bytes());
+ // Days and Milliseconds are zero, so their bytes are already 0.
+ // buf[4..8] is [0, 0, 0, 0]
+ // buf[8..12] is [0, 0, 0, 0]
+
+ out.write_all(&buf)
+ .map_err(|e| ArrowError::IoError(format!("write duration: {e}"),
e))
+ }
+}
+
+/// Avro `duration` encoder for Arrow `Interval(IntervalUnit::DayTime)`.
+struct IntervalDayTimeEncoder<'a>(&'a PrimitiveArray<IntervalDayTimeType>);
+impl IntervalDayTimeEncoder<'_> {
+ fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
+ // A DayTime interval is a packed (days: i32, milliseconds: i32).
+ let native = self.0.value(idx);
+ let (days, millis) = IntervalDayTimeType::to_parts(native);
+
+ if days < 0 || millis < 0 {
+ return Err(ArrowError::InvalidArgumentError(
+ "Avro 'duration' cannot encode negative days or
milliseconds".into(),
+ ));
+ }
+
+ // (months=0, days, millis)
+ let mut buf = [0u8; 12];
Review Comment:
What do you think about using an optimized generic `DurationEncoder` like
this? You should probably see a ~20% performance increase in throughput from
this.
```suggestion
#[derive(Copy, Clone)]
struct DurationParts {
months: u32,
days: u32,
millis: u32,
}
/// Trait mapping an Arrow interval native value to Avro duration `(months,
days, millis)`.
trait IntervalToDurationParts: ArrowPrimitiveType {
fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError>;
}
impl IntervalToDurationParts for IntervalMonthDayNanoType {
fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError> {
let (months, days, nanos) =
IntervalMonthDayNanoType::to_parts(native);
if months < 0 || days < 0 || nanos < 0 {
return Err(ArrowError::InvalidArgumentError(
"Avro 'duration' cannot encode negative
months/days/nanoseconds".into(),
));
}
if nanos % 1_000_000 != 0 {
return Err(ArrowError::InvalidArgumentError(
"Avro 'duration' requires whole milliseconds; nanoseconds
must be divisible by 1_000_000"
.into(),
));
}
let millis = nanos / 1_000_000;
if millis > u32::MAX as i64 {
return Err(ArrowError::InvalidArgumentError(
"Avro 'duration' milliseconds exceed u32::MAX".into(),
));
}
Ok(DurationParts {
months: months as u32,
days: days as u32,
millis: millis as u32,
})
}
}
impl IntervalToDurationParts for IntervalYearMonthType {
fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError> {
if native < 0 {
return Err(ArrowError::InvalidArgumentError(
"Avro 'duration' cannot encode negative months".into(),
));
}
Ok(DurationParts {
months: native as u32,
days: 0,
millis: 0,
})
}
}
impl IntervalToDurationParts for IntervalDayTimeType {
fn duration_parts(native: Self::Native) -> Result<DurationParts,
ArrowError> {
let (days, millis) = IntervalDayTimeType::to_parts(native);
if days < 0 || millis < 0 {
return Err(ArrowError::InvalidArgumentError(
"Avro 'duration' cannot encode negative days or
milliseconds".into(),
));
}
Ok(DurationParts {
months: 0,
days: days as u32,
millis: millis as u32,
})
}
}
/// Single generic encoder used for all three interval units.
/// Writes Avro `fixed(12)` as three little-endian u32 values in one call.
struct DurationEncoder<'a, P: ArrowPrimitiveType +
IntervalToDurationParts>(&'a PrimitiveArray<P>);
impl<'a, P: ArrowPrimitiveType + IntervalToDurationParts>
DurationEncoder<'a, P> {
#[inline(always)]
fn encode<W: Write + ?Sized>(&mut self, out: &mut W, idx: usize) ->
Result<(), ArrowError> {
let parts = P::duration_parts(self.0.value(idx))?;
let months = parts.months.to_le_bytes();
let days = parts.days.to_le_bytes();
let ms = parts.millis.to_le_bytes();
// SAFETY
// - Endianness & layout: Avro's `duration` logical type is encoded
as fixed(12)
// with three *little-endian* unsigned 32-bit integers in order:
(months, days, millis).
// We explicitly materialize exactly those 12 bytes.
// - In-bounds indexing: `to_le_bytes()` on `u32` returns `[u8; 4]`
by contract,
// therefore, the constant indices 0..=3 used below are *always*
in-bounds.
// Rust will panic on out-of-bounds indexing, but there is no such
path here;
// the compiler can also elide the bound checks for constant,
provably in-range
// indices. [std docs; Rust Performance Book on bounds-check
elimination]
// - Memory safety: The `[u8; 12]` array is built on the stack by
value, with no
// aliasing and no uninitialized memory. There is no `unsafe`.
// - I/O: `write_all(&buf)` is fallible and its `Result` is
propagated and mapped
// into `ArrowError`, so I/O errors are reported, not panicked.
// Consequently, constructing `buf` with the constant indices below
is safe and
// panic-free under these validated preconditions.
let buf = [
months[0], months[1], months[2], months[3],
days[0], days[1], days[2], days[3],
ms[0], ms[1], ms[2], ms[3],
];
out.write_all(&buf)
.map_err(|e| ArrowError::IoError(format!("write duration: {e}"),
e))
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]