This is an automated email from the ASF dual-hosted git repository.
kriskras99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new cefc678 chore: Split `writer` module into several submodules (#498)
cefc678 is described below
commit cefc678b69213ca2aea0adc02e43d2bdb16c7897
Author: Kriskras99 <[email protected]>
AuthorDate: Tue Mar 3 21:29:18 2026 +0100
chore: Split `writer` module into several submodules (#498)
---
avro/src/lib.rs | 5 +-
avro/src/writer/datum.rs | 387 +++++++++++++++++++++
avro/src/writer/mod.rs | 706 +--------------------------------------
avro/src/writer/single_object.rs | 397 ++++++++++++++++++++++
4 files changed, 794 insertions(+), 701 deletions(-)
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index a56aaad..3a2981f 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -99,8 +99,9 @@ pub use schema::Schema;
pub use serde::{AvroSchema, AvroSchemaComponent, from_value, to_value};
pub use uuid::Uuid;
pub use writer::{
- Clearable, GenericSingleObjectWriter, SpecificSingleObjectWriter, Writer,
WriterBuilder,
- to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref,
+ Clearable, Writer, WriterBuilder,
+ datum::{to_avro_datum, to_avro_datum_schemata, write_avro_datum_ref},
+ single_object::{GenericSingleObjectWriter, SpecificSingleObjectWriter},
};
#[cfg(feature = "derive")]
diff --git a/avro/src/writer/datum.rs b/avro/src/writer/datum.rs
new file mode 100644
index 0000000..14a40a2
--- /dev/null
+++ b/avro/src/writer/datum.rs
@@ -0,0 +1,387 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::io::Write;
+
+use serde::Serialize;
+
+use crate::{
+ AvroResult, Schema,
+ encode::{encode, encode_internal},
+ error::Details,
+ schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema},
+ serde::ser_schema::SchemaAwareWriteSerializer,
+ types::Value,
+};
+
+/// Encode a value into raw Avro data, also performs schema validation.
+///
+/// **NOTE**: This function has a quite small niche of usage and does NOT
generate headers and sync
+/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
+/// you are doing, instead.
+pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) ->
AvroResult<Vec<u8>> {
+ let mut buffer = Vec::new();
+ write_avro_datum(schema, value, &mut buffer)?;
+ Ok(buffer)
+}
+
+/// Write the referenced [Serialize]able object to the provided [Write] object.
+///
+/// Returns a result with the number of bytes written.
+///
+/// **NOTE**: This function has a quite small niche of usage and does **NOT**
generate headers and sync
+/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
+/// if you don't know what you are doing, instead.
+pub fn write_avro_datum_ref<T: Serialize, W: Write>(
+ schema: &Schema,
+ names: &NamesRef,
+ data: &T,
+ writer: &mut W,
+) -> AvroResult<usize> {
+ let mut serializer = SchemaAwareWriteSerializer::new(writer, schema,
names, None);
+ data.serialize(&mut serializer)
+}
+
+/// Encode a value into raw Avro data, also performs schema validation.
+///
+/// If the provided `schema` is incomplete then its dependencies must be
+/// provided in `schemata`
+pub fn to_avro_datum_schemata<T: Into<Value>>(
+ schema: &Schema,
+ schemata: Vec<&Schema>,
+ value: T,
+) -> AvroResult<Vec<u8>> {
+ let mut buffer = Vec::new();
+ write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
+ Ok(buffer)
+}
+
+/// Encode a value into raw Avro data, also performs schema validation.
+///
+/// This is an internal function which gets the bytes buffer where to write as
parameter instead of
+/// creating a new one like `to_avro_datum`.
+pub(super) fn write_avro_datum<T: Into<Value>, W: Write>(
+ schema: &Schema,
+ value: T,
+ writer: &mut W,
+) -> AvroResult<()> {
+ let avro = value.into();
+ if !avro.validate(schema) {
+ return Err(Details::Validation.into());
+ }
+ encode(&avro, schema, writer)?;
+ Ok(())
+}
+
+pub(super) fn write_avro_datum_schemata<T: Into<Value>>(
+ schema: &Schema,
+ schemata: Vec<&Schema>,
+ value: T,
+ buffer: &mut Vec<u8>,
+) -> AvroResult<usize> {
+ let avro = value.into();
+ let rs = ResolvedSchema::try_from(schemata)?;
+ let names = rs.get_names();
+ let enclosing_namespace = schema.namespace();
+ if let Some(_err) = avro.validate_internal(schema, names,
enclosing_namespace) {
+ return Err(Details::Validation.into());
+ }
+ encode_internal(&avro, schema, names, enclosing_namespace, buffer)
+}
+
+pub(super) fn write_value_ref_owned_resolved<W: Write>(
+ resolved_schema: &ResolvedOwnedSchema,
+ value: &Value,
+ writer: &mut W,
+) -> AvroResult<usize> {
+ let root_schema = resolved_schema.get_root_schema();
+ if let Some(reason) = value.validate_internal(
+ root_schema,
+ resolved_schema.get_names(),
+ root_schema.namespace(),
+ ) {
+ return Err(Details::ValidationWithReason {
+ value: value.clone(),
+ schema: root_schema.clone(),
+ reason,
+ }
+ .into());
+ }
+ encode_internal(
+ value,
+ root_schema,
+ resolved_schema.get_names(),
+ root_schema.namespace(),
+ writer,
+ )
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+
+ use apache_avro_test_helper::TestResult;
+
+ use crate::{
+ Days, Decimal, Duration, Millis, Months,
+ schema::{DecimalSchema, FixedSchema, InnerDecimalSchema, Name},
+ types::Record,
+ util::zig_i64,
+ };
+
+ use super::*;
+
+ const SCHEMA: &str = r#"
+ {
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {
+ "name": "a",
+ "type": "long",
+ "default": 42
+ },
+ {
+ "name": "b",
+ "type": "string"
+ }
+ ]
+ }
+ "#;
+
+ const UNION_SCHEMA: &str = r#"["null", "long"]"#;
+
+ #[test]
+ fn test_to_avro_datum() -> TestResult {
+ let schema = Schema::parse_str(SCHEMA)?;
+ let mut record = Record::new(&schema).unwrap();
+ record.put("a", 27i64);
+ record.put("b", "foo");
+
+ let mut expected = Vec::new();
+ zig_i64(27, &mut expected)?;
+ zig_i64(3, &mut expected)?;
+ expected.extend([b'f', b'o', b'o']);
+
+ assert_eq!(to_avro_datum(&schema, record)?, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn avro_rs_193_write_avro_datum_ref() -> TestResult {
+ #[derive(Serialize)]
+ struct TestStruct {
+ a: i64,
+ b: String,
+ }
+
+ let schema = Schema::parse_str(SCHEMA)?;
+ let mut writer: Vec<u8> = Vec::new();
+ let data = TestStruct {
+ a: 27,
+ b: "foo".to_string(),
+ };
+
+ let mut expected = Vec::new();
+ zig_i64(27, &mut expected)?;
+ zig_i64(3, &mut expected)?;
+ expected.extend([b'f', b'o', b'o']);
+
+ let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut
writer)?;
+
+ assert_eq!(bytes, expected.len());
+ assert_eq!(writer, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_union_not_null() -> TestResult {
+ let schema = Schema::parse_str(UNION_SCHEMA)?;
+ let union = Value::Union(1, Box::new(Value::Long(3)));
+
+ let mut expected = Vec::new();
+ zig_i64(1, &mut expected)?;
+ zig_i64(3, &mut expected)?;
+
+ assert_eq!(to_avro_datum(&schema, union)?, expected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_union_null() -> TestResult {
+ let schema = Schema::parse_str(UNION_SCHEMA)?;
+ let union = Value::Union(0, Box::new(Value::Null));
+
+ let mut expected = Vec::new();
+ zig_i64(0, &mut expected)?;
+
+ assert_eq!(to_avro_datum(&schema, union)?, expected);
+
+ Ok(())
+ }
+
+ fn logical_type_test<T: Into<Value> + Clone>(
+ schema_str: &'static str,
+
+ expected_schema: &Schema,
+ value: Value,
+
+ raw_schema: &Schema,
+ raw_value: T,
+ ) -> TestResult {
+ let schema = Schema::parse_str(schema_str)?;
+ assert_eq!(&schema, expected_schema);
+ // The serialized format should be the same as the schema.
+ let ser = to_avro_datum(&schema, value.clone())?;
+ let raw_ser = to_avro_datum(raw_schema, raw_value)?;
+ assert_eq!(ser, raw_ser);
+
+ // Should deserialize from the schema into the logical type.
+ let mut r = ser.as_slice();
+ let de = crate::from_avro_datum(&schema, &mut r, None)?;
+ assert_eq!(de, value);
+ Ok(())
+ }
+
+ #[test]
+ fn date() -> TestResult {
+ logical_type_test(
+ r#"{"type": "int", "logicalType": "date"}"#,
+ &Schema::Date,
+ Value::Date(1_i32),
+ &Schema::Int,
+ 1_i32,
+ )
+ }
+
+ #[test]
+ fn time_millis() -> TestResult {
+ logical_type_test(
+ r#"{"type": "int", "logicalType": "time-millis"}"#,
+ &Schema::TimeMillis,
+ Value::TimeMillis(1_i32),
+ &Schema::Int,
+ 1_i32,
+ )
+ }
+
+ #[test]
+ fn time_micros() -> TestResult {
+ logical_type_test(
+ r#"{"type": "long", "logicalType": "time-micros"}"#,
+ &Schema::TimeMicros,
+ Value::TimeMicros(1_i64),
+ &Schema::Long,
+ 1_i64,
+ )
+ }
+
+ #[test]
+ fn timestamp_millis() -> TestResult {
+ logical_type_test(
+ r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
+ &Schema::TimestampMillis,
+ Value::TimestampMillis(1_i64),
+ &Schema::Long,
+ 1_i64,
+ )
+ }
+
+ #[test]
+ fn timestamp_micros() -> TestResult {
+ logical_type_test(
+ r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
+ &Schema::TimestampMicros,
+ Value::TimestampMicros(1_i64),
+ &Schema::Long,
+ 1_i64,
+ )
+ }
+
+ #[test]
+ fn decimal_fixed() -> TestResult {
+ let size = 30;
+ let fixed = FixedSchema {
+ name: Name::new("decimal")?,
+ aliases: None,
+ doc: None,
+ size,
+ attributes: Default::default(),
+ };
+ let inner = InnerDecimalSchema::Fixed(fixed.clone());
+ let value = vec![0u8; size];
+ logical_type_test(
+ r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"},
"logicalType": "decimal", "precision": 20, "scale": 5}"#,
+ &Schema::Decimal(DecimalSchema {
+ precision: 20,
+ scale: 5,
+ inner,
+ }),
+ Value::Decimal(Decimal::from(value.clone())),
+ &Schema::Fixed(fixed),
+ Value::Fixed(size, value),
+ )
+ }
+
+ #[test]
+ fn decimal_bytes() -> TestResult {
+ let value = vec![0u8; 10];
+ logical_type_test(
+ r#"{"type": "bytes", "logicalType": "decimal", "precision": 4,
"scale": 3}"#,
+ &Schema::Decimal(DecimalSchema {
+ precision: 4,
+ scale: 3,
+ inner: InnerDecimalSchema::Bytes,
+ }),
+ Value::Decimal(Decimal::from(value.clone())),
+ &Schema::Bytes,
+ value,
+ )
+ }
+
+ #[test]
+ fn duration() -> TestResult {
+ let inner = Schema::Fixed(FixedSchema {
+ name: Name::new("duration")?,
+ aliases: None,
+ doc: None,
+ size: 12,
+ attributes: Default::default(),
+ });
+ let value = Value::Duration(Duration::new(
+ Months::new(256),
+ Days::new(512),
+ Millis::new(1024),
+ ));
+ logical_type_test(
+ r#"{"type": {"type": "fixed", "name": "duration", "size": 12},
"logicalType": "duration"}"#,
+ &Schema::Duration(FixedSchema {
+ name: Name::try_from("duration").expect("Name is valid"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ attributes: Default::default(),
+ }),
+ value,
+ &inner,
+ Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
+ )
+ }
+}
diff --git a/avro/src/writer/mod.rs b/avro/src/writer/mod.rs
index fe264a1..20f19a3 100644
--- a/avro/src/writer/mod.rs
+++ b/avro/src/writer/mod.rs
@@ -20,15 +20,15 @@ use crate::{
AvroResult, Codec, Error,
encode::{encode, encode_internal, encode_to_vec},
error::Details,
- headers::{HeaderBuilder, RabinFingerprintHeader},
- schema::{NamesRef, ResolvedOwnedSchema, ResolvedSchema, Schema},
- serde::{AvroSchema, ser_schema::SchemaAwareWriteSerializer},
+ schema::{ResolvedSchema, Schema},
+ serde::ser_schema::SchemaAwareWriteSerializer,
types::Value,
};
use serde::Serialize;
-use std::{
- collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop,
ops::RangeInclusive,
-};
+use std::{collections::HashMap, io::Write, mem::ManuallyDrop};
+
+pub mod datum;
+pub mod single_object;
const DEFAULT_BLOCK_SIZE: usize = 16000;
const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
@@ -614,263 +614,6 @@ impl<W: Write> Drop for Writer<'_, W> {
}
}
-/// Encode a value into raw Avro data, also performs schema validation.
-///
-/// This is an internal function which gets the bytes buffer where to write as
parameter instead of
-/// creating a new one like `to_avro_datum`.
-fn write_avro_datum<T: Into<Value>, W: Write>(
- schema: &Schema,
- value: T,
- writer: &mut W,
-) -> Result<(), Error> {
- let avro = value.into();
- if !avro.validate(schema) {
- return Err(Details::Validation.into());
- }
- encode(&avro, schema, writer)?;
- Ok(())
-}
-
-fn write_avro_datum_schemata<T: Into<Value>>(
- schema: &Schema,
- schemata: Vec<&Schema>,
- value: T,
- buffer: &mut Vec<u8>,
-) -> AvroResult<usize> {
- let avro = value.into();
- let rs = ResolvedSchema::try_from(schemata)?;
- let names = rs.get_names();
- let enclosing_namespace = schema.namespace();
- if let Some(_err) = avro.validate_internal(schema, names,
enclosing_namespace) {
- return Err(Details::Validation.into());
- }
- encode_internal(&avro, schema, names, enclosing_namespace, buffer)
-}
-
-/// Writer that encodes messages according to the single object encoding v1
spec
-/// Uses an API similar to the current File Writer
-/// Writes all object bytes at once, and drains internal buffer
-pub struct GenericSingleObjectWriter {
- buffer: Vec<u8>,
- resolved: ResolvedOwnedSchema,
-}
-
-impl GenericSingleObjectWriter {
- pub fn new_with_capacity(
- schema: &Schema,
- initial_buffer_cap: usize,
- ) -> AvroResult<GenericSingleObjectWriter> {
- let header_builder = RabinFingerprintHeader::from_schema(schema);
- Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap,
header_builder)
- }
-
- pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
- schema: &Schema,
- initial_buffer_cap: usize,
- header_builder: HB,
- ) -> AvroResult<GenericSingleObjectWriter> {
- let mut buffer = Vec::with_capacity(initial_buffer_cap);
- let header = header_builder.build_header();
- buffer.extend_from_slice(&header);
-
- Ok(GenericSingleObjectWriter {
- buffer,
- resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
- })
- }
-
- const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
-
- /// Write the referenced Value to the provided Write object. Returns a
result with the number of bytes written including the header
- pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) ->
AvroResult<usize> {
- let original_length = self.buffer.len();
- if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
- Err(Details::IllegalSingleObjectWriterState.into())
- } else {
- write_value_ref_owned_resolved(&self.resolved, v, &mut
self.buffer)?;
- writer
- .write_all(&self.buffer)
- .map_err(Details::WriteBytes)?;
- let len = self.buffer.len();
- self.buffer.truncate(original_length);
- Ok(len)
- }
- }
-
- /// Write the Value to the provided Write object. Returns a result with
the number of bytes written including the header
- pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) ->
AvroResult<usize> {
- self.write_value_ref(&v, writer)
- }
-}
-
-/// Writer that encodes messages according to the single object encoding v1
spec
-pub struct SpecificSingleObjectWriter<T>
-where
- T: AvroSchema,
-{
- resolved: ResolvedOwnedSchema,
- header: Vec<u8>,
- _model: PhantomData<T>,
-}
-
-impl<T> SpecificSingleObjectWriter<T>
-where
- T: AvroSchema,
-{
- pub fn new() -> AvroResult<Self> {
- let schema = T::get_schema();
- let header =
RabinFingerprintHeader::from_schema(&schema).build_header();
- let resolved = ResolvedOwnedSchema::new(schema)?;
- // We don't use Self::new_with_header_builder as that would mean
calling T::get_schema() twice
- Ok(Self {
- resolved,
- header,
- _model: PhantomData,
- })
- }
-
- pub fn new_with_header_builder(header_builder: impl HeaderBuilder) ->
AvroResult<Self> {
- let header = header_builder.build_header();
- let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
- Ok(Self {
- resolved,
- header,
- _model: PhantomData,
- })
- }
-
- /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
- #[deprecated(since = "0.22.0", note = "Use new() instead")]
- pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
- Self::new()
- }
-}
-
-impl<T> SpecificSingleObjectWriter<T>
-where
- T: AvroSchema + Into<Value>,
-{
- /// Write the value to the writer
- ///
- /// Returns the number of bytes written.
- ///
- /// Each call writes a complete single-object encoded message (header +
data),
- /// making each message independently decodable.
- pub fn write_value<W: Write>(&self, data: T, writer: &mut W) ->
AvroResult<usize> {
- writer
- .write_all(&self.header)
- .map_err(Details::WriteBytes)?;
- let value: Value = data.into();
- let bytes = write_value_ref_owned_resolved(&self.resolved, &value,
writer)?;
- Ok(bytes + self.header.len())
- }
-}
-
-impl<T> SpecificSingleObjectWriter<T>
-where
- T: AvroSchema + Serialize,
-{
- /// Write the object to the writer.
- ///
- /// Returns the number of bytes written.
- ///
- /// Each call writes a complete single-object encoded message (header +
data),
- /// making each message independently decodable.
- pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) ->
AvroResult<usize> {
- writer
- .write_all(&self.header)
- .map_err(Details::WriteBytes)?;
-
- let bytes = write_avro_datum_ref(
- self.resolved.get_root_schema(),
- self.resolved.get_names(),
- data,
- writer,
- )?;
-
- Ok(bytes + self.header.len())
- }
-
- /// Write the object to the writer.
- ///
- /// Returns the number of bytes written.
- ///
- /// Each call writes a complete single-object encoded message (header +
data),
- /// making each message independently decodable.
- pub fn write<W: Write>(&self, data: T, writer: &mut W) ->
AvroResult<usize> {
- self.write_ref(&data, writer)
- }
-}
-
-fn write_value_ref_owned_resolved<W: Write>(
- resolved_schema: &ResolvedOwnedSchema,
- value: &Value,
- writer: &mut W,
-) -> AvroResult<usize> {
- let root_schema = resolved_schema.get_root_schema();
- if let Some(reason) = value.validate_internal(
- root_schema,
- resolved_schema.get_names(),
- root_schema.namespace(),
- ) {
- return Err(Details::ValidationWithReason {
- value: value.clone(),
- schema: root_schema.clone(),
- reason,
- }
- .into());
- }
- encode_internal(
- value,
- root_schema,
- resolved_schema.get_names(),
- root_schema.namespace(),
- writer,
- )
-}
-
-/// Encode a value into raw Avro data, also performs schema validation.
-///
-/// **NOTE**: This function has a quite small niche of usage and does NOT
generate headers and sync
-/// markers; use [`Writer`] to be fully Avro-compatible if you don't know what
-/// you are doing, instead.
-pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) ->
AvroResult<Vec<u8>> {
- let mut buffer = Vec::new();
- write_avro_datum(schema, value, &mut buffer)?;
- Ok(buffer)
-}
-
-/// Write the referenced [Serialize]able object to the provided [Write] object.
-///
-/// Returns a result with the number of bytes written.
-///
-/// **NOTE**: This function has a quite small niche of usage and does **NOT**
generate headers and sync
-/// markers; use [`append_ser`](Writer::append_ser) to be fully Avro-compatible
-/// if you don't know what you are doing, instead.
-pub fn write_avro_datum_ref<T: Serialize, W: Write>(
- schema: &Schema,
- names: &NamesRef,
- data: &T,
- writer: &mut W,
-) -> AvroResult<usize> {
- let mut serializer = SchemaAwareWriteSerializer::new(writer, schema,
names, None);
- data.serialize(&mut serializer)
-}
-
-/// Encode a value into raw Avro data, also performs schema validation.
-///
-/// If the provided `schema` is incomplete then its dependencies must be
-/// provided in `schemata`
-pub fn to_avro_datum_schemata<T: Into<Value>>(
- schema: &Schema,
- schemata: Vec<&Schema>,
- value: T,
-) -> AvroResult<Vec<u8>> {
- let mut buffer = Vec::new();
- write_avro_datum_schemata(schema, schemata, value, &mut buffer)?;
- Ok(buffer)
-}
-
#[cfg(not(target_arch = "wasm32"))]
fn generate_sync_marker() -> [u8; 16] {
rand::random()
@@ -892,21 +635,10 @@ mod tests {
use std::{cell::RefCell, rc::Rc};
use super::*;
- use crate::{
- Reader,
- decimal::Decimal,
- duration::{Days, Duration, Millis, Months},
- headers::GlueSchemaUuidHeader,
- rabin::Rabin,
- schema::{DecimalSchema, FixedSchema, Name},
- types::Record,
- util::zig_i64,
- };
+ use crate::{Reader, types::Record, util::zig_i64};
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
- use uuid::Uuid;
- use crate::schema::InnerDecimalSchema;
use crate::{codec::DeflateSettings, error::Details};
use apache_avro_test_helper::TestResult;
@@ -930,53 +662,6 @@ mod tests {
}
"#;
- const UNION_SCHEMA: &str = r#"["null", "long"]"#;
-
- #[test]
- fn test_to_avro_datum() -> TestResult {
- let schema = Schema::parse_str(SCHEMA)?;
- let mut record = Record::new(&schema).unwrap();
- record.put("a", 27i64);
- record.put("b", "foo");
-
- let mut expected = Vec::new();
- zig_i64(27, &mut expected)?;
- zig_i64(3, &mut expected)?;
- expected.extend([b'f', b'o', b'o']);
-
- assert_eq!(to_avro_datum(&schema, record)?, expected);
-
- Ok(())
- }
-
- #[test]
- fn avro_rs_193_write_avro_datum_ref() -> TestResult {
- #[derive(Serialize)]
- struct TestStruct {
- a: i64,
- b: String,
- }
-
- let schema = Schema::parse_str(SCHEMA)?;
- let mut writer: Vec<u8> = Vec::new();
- let data = TestStruct {
- a: 27,
- b: "foo".to_string(),
- };
-
- let mut expected = Vec::new();
- zig_i64(27, &mut expected)?;
- zig_i64(3, &mut expected)?;
- expected.extend([b'f', b'o', b'o']);
-
- let bytes = write_avro_datum_ref(&schema, &HashMap::new(), &data, &mut
writer)?;
-
- assert_eq!(bytes, expected.len());
- assert_eq!(writer, expected);
-
- Ok(())
- }
-
#[test]
fn avro_rs_220_flush_write_header() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
@@ -1000,181 +685,6 @@ mod tests {
Ok(())
}
- #[test]
- fn test_union_not_null() -> TestResult {
- let schema = Schema::parse_str(UNION_SCHEMA)?;
- let union = Value::Union(1, Box::new(Value::Long(3)));
-
- let mut expected = Vec::new();
- zig_i64(1, &mut expected)?;
- zig_i64(3, &mut expected)?;
-
- assert_eq!(to_avro_datum(&schema, union)?, expected);
-
- Ok(())
- }
-
- #[test]
- fn test_union_null() -> TestResult {
- let schema = Schema::parse_str(UNION_SCHEMA)?;
- let union = Value::Union(0, Box::new(Value::Null));
-
- let mut expected = Vec::new();
- zig_i64(0, &mut expected)?;
-
- assert_eq!(to_avro_datum(&schema, union)?, expected);
-
- Ok(())
- }
-
- fn logical_type_test<T: Into<Value> + Clone>(
- schema_str: &'static str,
-
- expected_schema: &Schema,
- value: Value,
-
- raw_schema: &Schema,
- raw_value: T,
- ) -> TestResult {
- let schema = Schema::parse_str(schema_str)?;
- assert_eq!(&schema, expected_schema);
- // The serialized format should be the same as the schema.
- let ser = to_avro_datum(&schema, value.clone())?;
- let raw_ser = to_avro_datum(raw_schema, raw_value)?;
- assert_eq!(ser, raw_ser);
-
- // Should deserialize from the schema into the logical type.
- let mut r = ser.as_slice();
- let de = crate::from_avro_datum(&schema, &mut r, None)?;
- assert_eq!(de, value);
- Ok(())
- }
-
- #[test]
- fn date() -> TestResult {
- logical_type_test(
- r#"{"type": "int", "logicalType": "date"}"#,
- &Schema::Date,
- Value::Date(1_i32),
- &Schema::Int,
- 1_i32,
- )
- }
-
- #[test]
- fn time_millis() -> TestResult {
- logical_type_test(
- r#"{"type": "int", "logicalType": "time-millis"}"#,
- &Schema::TimeMillis,
- Value::TimeMillis(1_i32),
- &Schema::Int,
- 1_i32,
- )
- }
-
- #[test]
- fn time_micros() -> TestResult {
- logical_type_test(
- r#"{"type": "long", "logicalType": "time-micros"}"#,
- &Schema::TimeMicros,
- Value::TimeMicros(1_i64),
- &Schema::Long,
- 1_i64,
- )
- }
-
- #[test]
- fn timestamp_millis() -> TestResult {
- logical_type_test(
- r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
- &Schema::TimestampMillis,
- Value::TimestampMillis(1_i64),
- &Schema::Long,
- 1_i64,
- )
- }
-
- #[test]
- fn timestamp_micros() -> TestResult {
- logical_type_test(
- r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
- &Schema::TimestampMicros,
- Value::TimestampMicros(1_i64),
- &Schema::Long,
- 1_i64,
- )
- }
-
- #[test]
- fn decimal_fixed() -> TestResult {
- let size = 30;
- let fixed = FixedSchema {
- name: Name::new("decimal")?,
- aliases: None,
- doc: None,
- size,
- attributes: Default::default(),
- };
- let inner = InnerDecimalSchema::Fixed(fixed.clone());
- let value = vec![0u8; size];
- logical_type_test(
- r#"{"type": {"type": "fixed", "size": 30, "name": "decimal"},
"logicalType": "decimal", "precision": 20, "scale": 5}"#,
- &Schema::Decimal(DecimalSchema {
- precision: 20,
- scale: 5,
- inner,
- }),
- Value::Decimal(Decimal::from(value.clone())),
- &Schema::Fixed(fixed),
- Value::Fixed(size, value),
- )
- }
-
- #[test]
- fn decimal_bytes() -> TestResult {
- let value = vec![0u8; 10];
- logical_type_test(
- r#"{"type": "bytes", "logicalType": "decimal", "precision": 4,
"scale": 3}"#,
- &Schema::Decimal(DecimalSchema {
- precision: 4,
- scale: 3,
- inner: InnerDecimalSchema::Bytes,
- }),
- Value::Decimal(Decimal::from(value.clone())),
- &Schema::Bytes,
- value,
- )
- }
-
- #[test]
- fn duration() -> TestResult {
- let inner = Schema::Fixed(FixedSchema {
- name: Name::new("duration")?,
- aliases: None,
- doc: None,
- size: 12,
- attributes: Default::default(),
- });
- let value = Value::Duration(Duration::new(
- Months::new(256),
- Days::new(512),
- Millis::new(1024),
- ));
- logical_type_test(
- r#"{"type": {"type": "fixed", "name": "duration", "size": 12},
"logicalType": "duration"}"#,
- &Schema::Duration(FixedSchema {
- name: Name::try_from("duration").expect("Name is valid"),
- aliases: None,
- doc: None,
- size: 12,
- attributes: Default::default(),
- }),
- value,
- &inner,
- Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),
- )
- }
-
#[test]
fn test_writer_append() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
@@ -1564,163 +1074,6 @@ mod tests {
Ok(())
}
- #[derive(Serialize, Clone)]
- struct TestSingleObjectWriter {
- a: i64,
- b: f64,
- c: Vec<String>,
- }
-
- impl AvroSchema for TestSingleObjectWriter {
- fn get_schema() -> Schema {
- let schema = r#"
- {
- "type":"record",
- "name":"TestSingleObjectWrtierSerialize",
- "fields":[
- {
- "name":"a",
- "type":"long"
- },
- {
- "name":"b",
- "type":"double"
- },
- {
- "name":"c",
- "type":{
- "type":"array",
- "items":"string"
- }
- }
- ]
- }
- "#;
- Schema::parse_str(schema).unwrap()
- }
- }
-
- impl From<TestSingleObjectWriter> for Value {
- fn from(obj: TestSingleObjectWriter) -> Value {
- Value::Record(vec![
- ("a".into(), obj.a.into()),
- ("b".into(), obj.b.into()),
- (
- "c".into(),
- Value::Array(obj.c.into_iter().map(|s|
s.into()).collect()),
- ),
- ])
- }
- }
-
- #[test]
- fn test_single_object_writer() -> TestResult {
- let mut buf: Vec<u8> = Vec::new();
- let obj = TestSingleObjectWriter {
- a: 300,
- b: 34.555,
- c: vec!["cat".into(), "dog".into()],
- };
- let mut writer = GenericSingleObjectWriter::new_with_capacity(
- &TestSingleObjectWriter::get_schema(),
- 1024,
- )
- .expect("Should resolve schema");
- let value = obj.into();
- let written_bytes = writer
- .write_value_ref(&value, &mut buf)
- .expect("Error serializing properly");
-
- assert!(buf.len() > 10, "no bytes written");
- assert_eq!(buf.len(), written_bytes);
- assert_eq!(buf[0], 0xC3);
- assert_eq!(buf[1], 0x01);
- assert_eq!(
- &buf[2..10],
- &TestSingleObjectWriter::get_schema()
- .fingerprint::<Rabin>()
- .bytes[..]
- );
- let mut msg_binary = Vec::new();
- encode(
- &value,
- &TestSingleObjectWriter::get_schema(),
- &mut msg_binary,
- )
- .expect("encode should have failed by here as a dependency of any
writing");
- assert_eq!(&buf[10..], &msg_binary[..]);
-
- Ok(())
- }
-
- #[test]
- fn test_single_object_writer_with_header_builder() -> TestResult {
- let mut buf: Vec<u8> = Vec::new();
- let obj = TestSingleObjectWriter {
- a: 300,
- b: 34.555,
- c: vec!["cat".into(), "dog".into()],
- };
- let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
- let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
- let mut writer =
GenericSingleObjectWriter::new_with_capacity_and_header_builder(
- &TestSingleObjectWriter::get_schema(),
- 1024,
- header_builder,
- )
- .expect("Should resolve schema");
- let value = obj.into();
- writer
- .write_value_ref(&value, &mut buf)
- .expect("Error serializing properly");
-
- assert_eq!(buf[0], 0x03);
- assert_eq!(buf[1], 0x00);
- assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
- Ok(())
- }
-
- #[test]
- fn test_writer_parity() -> TestResult {
- let obj1 = TestSingleObjectWriter {
- a: 300,
- b: 34.555,
- c: vec!["cat".into(), "dog".into()],
- };
-
- let mut buf1: Vec<u8> = Vec::new();
- let mut buf2: Vec<u8> = Vec::new();
- let mut buf3: Vec<u8> = Vec::new();
- let mut buf4: Vec<u8> = Vec::new();
-
- let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
- &TestSingleObjectWriter::get_schema(),
- 1024,
- )
- .expect("Should resolve schema");
- let specific_writer =
SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
- .expect("Resolved should pass");
- specific_writer
- .write_ref(&obj1, &mut buf1)
- .expect("Serialization expected");
- specific_writer
- .write_ref(&obj1, &mut buf2)
- .expect("Serialization expected");
- specific_writer
- .write_value(obj1.clone(), &mut buf3)
- .expect("Serialization expected");
-
- generic_writer
- .write_value(obj1.into(), &mut buf4)
- .expect("Serialization expected");
-
- assert_eq!(buf1, buf2);
- assert_eq!(buf2, buf3);
- assert_eq!(buf3, buf4);
-
- Ok(())
- }
-
#[test]
fn avro_3894_take_aliases_into_account_when_serializing() -> TestResult {
const SCHEMA: &str = r#"
@@ -1846,51 +1199,6 @@ mod tests {
Ok(())
}
- #[test]
- fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
- #[derive(Serialize)]
- struct Recursive {
- field: bool,
- recurse: Option<Box<Recursive>>,
- }
-
- impl AvroSchema for Recursive {
- fn get_schema() -> Schema {
- Schema::parse_str(
- r#"{
- "name": "Recursive",
- "type": "record",
- "fields": [
- { "name": "field", "type": "boolean" },
- { "name": "recurse", "type": ["null", "Recursive"] }
- ]
- }"#,
- )
- .unwrap()
- }
- }
-
- let mut buffer = Vec::new();
- let writer = SpecificSingleObjectWriter::new()?;
-
- writer.write(
- Recursive {
- field: true,
- recurse: Some(Box::new(Recursive {
- field: false,
- recurse: None,
- })),
- },
- &mut buffer,
- )?;
- assert_eq!(
- buffer,
- &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
- );
-
- Ok(())
- }
-
#[test]
fn avro_rs_310_append_unvalidated_value() -> TestResult {
let schema = Schema::String;
diff --git a/avro/src/writer/single_object.rs b/avro/src/writer/single_object.rs
new file mode 100644
index 0000000..136986a
--- /dev/null
+++ b/avro/src/writer/single_object.rs
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{io::Write, marker::PhantomData, ops::RangeInclusive};
+
+use serde::Serialize;
+
+use crate::{
+ AvroResult, AvroSchema, Schema,
+ error::Details,
+ headers::{HeaderBuilder, RabinFingerprintHeader},
+ schema::ResolvedOwnedSchema,
+ types::Value,
+ write_avro_datum_ref,
+ writer::datum::write_value_ref_owned_resolved,
+};
+
+/// Writer that encodes messages according to the single object encoding v1
spec
+/// Uses an API similar to the current File Writer
+/// Writes all object bytes at once, and drains internal buffer
+pub struct GenericSingleObjectWriter {
+ buffer: Vec<u8>,
+ resolved: ResolvedOwnedSchema,
+}
+
+impl GenericSingleObjectWriter {
+ pub fn new_with_capacity(
+ schema: &Schema,
+ initial_buffer_cap: usize,
+ ) -> AvroResult<GenericSingleObjectWriter> {
+ let header_builder = RabinFingerprintHeader::from_schema(schema);
+ Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap,
header_builder)
+ }
+
+ pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
+ schema: &Schema,
+ initial_buffer_cap: usize,
+ header_builder: HB,
+ ) -> AvroResult<GenericSingleObjectWriter> {
+ let mut buffer = Vec::with_capacity(initial_buffer_cap);
+ let header = header_builder.build_header();
+ buffer.extend_from_slice(&header);
+
+ Ok(GenericSingleObjectWriter {
+ buffer,
+ resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
+ })
+ }
+
+ const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
+
+ /// Write the referenced Value to the provided Write object. Returns a
result with the number of bytes written including the header
+ pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) ->
AvroResult<usize> {
+ let original_length = self.buffer.len();
+ if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
+ Err(Details::IllegalSingleObjectWriterState.into())
+ } else {
+ write_value_ref_owned_resolved(&self.resolved, v, &mut
self.buffer)?;
+ writer
+ .write_all(&self.buffer)
+ .map_err(Details::WriteBytes)?;
+ let len = self.buffer.len();
+ self.buffer.truncate(original_length);
+ Ok(len)
+ }
+ }
+
+ /// Write the Value to the provided Write object. Returns a result with
the number of bytes written including the header
+ pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) ->
AvroResult<usize> {
+ self.write_value_ref(&v, writer)
+ }
+}
+
+/// Writer that encodes messages according to the single object encoding v1
spec
+pub struct SpecificSingleObjectWriter<T>
+where
+ T: AvroSchema,
+{
+ resolved: ResolvedOwnedSchema,
+ header: Vec<u8>,
+ _model: PhantomData<T>,
+}
+
+impl<T> SpecificSingleObjectWriter<T>
+where
+ T: AvroSchema,
+{
+ pub fn new() -> AvroResult<Self> {
+ let schema = T::get_schema();
+ let header =
RabinFingerprintHeader::from_schema(&schema).build_header();
+ let resolved = ResolvedOwnedSchema::new(schema)?;
+ // We don't use Self::new_with_header_builder as that would mean
calling T::get_schema() twice
+ Ok(Self {
+ resolved,
+ header,
+ _model: PhantomData,
+ })
+ }
+
+ pub fn new_with_header_builder(header_builder: impl HeaderBuilder) ->
AvroResult<Self> {
+ let header = header_builder.build_header();
+ let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
+ Ok(Self {
+ resolved,
+ header,
+ _model: PhantomData,
+ })
+ }
+
+ /// Deprecated. Use [`SpecificSingleObjectWriter::new`] instead.
+ #[deprecated(since = "0.22.0", note = "Use new() instead")]
+ pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
+ Self::new()
+ }
+}
+
+impl<T> SpecificSingleObjectWriter<T>
+where
+ T: AvroSchema + Into<Value>,
+{
+ /// Write the value to the writer
+ ///
+ /// Returns the number of bytes written.
+ ///
+ /// Each call writes a complete single-object encoded message (header +
data),
+ /// making each message independently decodable.
+ pub fn write_value<W: Write>(&self, data: T, writer: &mut W) ->
AvroResult<usize> {
+ writer
+ .write_all(&self.header)
+ .map_err(Details::WriteBytes)?;
+ let value: Value = data.into();
+ let bytes = write_value_ref_owned_resolved(&self.resolved, &value,
writer)?;
+ Ok(bytes + self.header.len())
+ }
+}
+
+impl<T> SpecificSingleObjectWriter<T>
+where
+ T: AvroSchema + Serialize,
+{
+ /// Write the object to the writer.
+ ///
+ /// Returns the number of bytes written.
+ ///
+ /// Each call writes a complete single-object encoded message (header +
data),
+ /// making each message independently decodable.
+ pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) ->
AvroResult<usize> {
+ writer
+ .write_all(&self.header)
+ .map_err(Details::WriteBytes)?;
+
+ let bytes = write_avro_datum_ref(
+ self.resolved.get_root_schema(),
+ self.resolved.get_names(),
+ data,
+ writer,
+ )?;
+
+ Ok(bytes + self.header.len())
+ }
+
+ /// Write the object to the writer.
+ ///
+ /// Returns the number of bytes written.
+ ///
+ /// Each call writes a complete single-object encoded message (header +
data),
+ /// making each message independently decodable.
+ pub fn write<W: Write>(&self, data: T, writer: &mut W) ->
AvroResult<usize> {
+ self.write_ref(&data, writer)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use apache_avro_test_helper::TestResult;
+ use uuid::Uuid;
+
+ use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
+
+ use super::*;
+
+ #[derive(Serialize, Clone)]
+ struct TestSingleObjectWriter {
+ a: i64,
+ b: f64,
+ c: Vec<String>,
+ }
+
+ impl AvroSchema for TestSingleObjectWriter {
+ fn get_schema() -> Schema {
+ let schema = r#"
+ {
+ "type":"record",
+ "name":"TestSingleObjectWrtierSerialize",
+ "fields":[
+ {
+ "name":"a",
+ "type":"long"
+ },
+ {
+ "name":"b",
+ "type":"double"
+ },
+ {
+ "name":"c",
+ "type":{
+ "type":"array",
+ "items":"string"
+ }
+ }
+ ]
+ }
+ "#;
+ Schema::parse_str(schema).unwrap()
+ }
+ }
+
+ impl From<TestSingleObjectWriter> for Value {
+ fn from(obj: TestSingleObjectWriter) -> Value {
+ Value::Record(vec![
+ ("a".into(), obj.a.into()),
+ ("b".into(), obj.b.into()),
+ (
+ "c".into(),
+ Value::Array(obj.c.into_iter().map(|s|
s.into()).collect()),
+ ),
+ ])
+ }
+ }
+
+ #[test]
+ fn test_single_object_writer() -> TestResult {
+ let mut buf: Vec<u8> = Vec::new();
+ let obj = TestSingleObjectWriter {
+ a: 300,
+ b: 34.555,
+ c: vec!["cat".into(), "dog".into()],
+ };
+ let mut writer = GenericSingleObjectWriter::new_with_capacity(
+ &TestSingleObjectWriter::get_schema(),
+ 1024,
+ )
+ .expect("Should resolve schema");
+ let value = obj.into();
+ let written_bytes = writer
+ .write_value_ref(&value, &mut buf)
+ .expect("Error serializing properly");
+
+ assert!(buf.len() > 10, "no bytes written");
+ assert_eq!(buf.len(), written_bytes);
+ assert_eq!(buf[0], 0xC3);
+ assert_eq!(buf[1], 0x01);
+ assert_eq!(
+ &buf[2..10],
+ &TestSingleObjectWriter::get_schema()
+ .fingerprint::<Rabin>()
+ .bytes[..]
+ );
+ let mut msg_binary = Vec::new();
+ encode(
+ &value,
+ &TestSingleObjectWriter::get_schema(),
+ &mut msg_binary,
+ )
+ .expect("encode should have failed by here as a dependency of any
writing");
+ assert_eq!(&buf[10..], &msg_binary[..]);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_single_object_writer_with_header_builder() -> TestResult {
+ let mut buf: Vec<u8> = Vec::new();
+ let obj = TestSingleObjectWriter {
+ a: 300,
+ b: 34.555,
+ c: vec!["cat".into(), "dog".into()],
+ };
+ let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+ let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+ let mut writer =
GenericSingleObjectWriter::new_with_capacity_and_header_builder(
+ &TestSingleObjectWriter::get_schema(),
+ 1024,
+ header_builder,
+ )
+ .expect("Should resolve schema");
+ let value = obj.into();
+ writer
+ .write_value_ref(&value, &mut buf)
+ .expect("Error serializing properly");
+
+ assert_eq!(buf[0], 0x03);
+ assert_eq!(buf[1], 0x00);
+ assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
+ Ok(())
+ }
+
+ #[test]
+ fn test_writer_parity() -> TestResult {
+ let obj1 = TestSingleObjectWriter {
+ a: 300,
+ b: 34.555,
+ c: vec!["cat".into(), "dog".into()],
+ };
+
+ let mut buf1: Vec<u8> = Vec::new();
+ let mut buf2: Vec<u8> = Vec::new();
+ let mut buf3: Vec<u8> = Vec::new();
+ let mut buf4: Vec<u8> = Vec::new();
+
+ let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
+ &TestSingleObjectWriter::get_schema(),
+ 1024,
+ )
+ .expect("Should resolve schema");
+ let specific_writer =
SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
+ .expect("Resolved should pass");
+ specific_writer
+ .write_ref(&obj1, &mut buf1)
+ .expect("Serialization expected");
+ specific_writer
+ .write_ref(&obj1, &mut buf2)
+ .expect("Serialization expected");
+ specific_writer
+ .write_value(obj1.clone(), &mut buf3)
+ .expect("Serialization expected");
+
+ generic_writer
+ .write_value(obj1.into(), &mut buf4)
+ .expect("Serialization expected");
+
+ assert_eq!(buf1, buf2);
+ assert_eq!(buf2, buf3);
+ assert_eq!(buf3, buf4);
+
+ Ok(())
+ }
+
+ #[test]
+ fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
+ #[derive(Serialize)]
+ struct Recursive {
+ field: bool,
+ recurse: Option<Box<Recursive>>,
+ }
+
+ impl AvroSchema for Recursive {
+ fn get_schema() -> Schema {
+ Schema::parse_str(
+ r#"{
+ "name": "Recursive",
+ "type": "record",
+ "fields": [
+ { "name": "field", "type": "boolean" },
+ { "name": "recurse", "type": ["null", "Recursive"] }
+ ]
+ }"#,
+ )
+ .unwrap()
+ }
+ }
+
+ let mut buffer = Vec::new();
+ let writer = SpecificSingleObjectWriter::new()?;
+
+ writer.write(
+ Recursive {
+ field: true,
+ recurse: Some(Box::new(Recursive {
+ field: false,
+ recurse: None,
+ })),
+ },
+ &mut buffer,
+ )?;
+ assert_eq!(
+ buffer,
+ &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
+ );
+
+ Ok(())
+ }
+}