martin-g commented on code in PR #139:
URL: https://github.com/apache/avro-rs/pull/139#discussion_r1979454221


##########
avro/src/ser_direct.rs:
##########
@@ -0,0 +1,2049 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{borrow::Cow, io::Write};
+
+use serde::ser;
+
+use crate::{
+    encode::{encode_int, encode_long},
+    error::Error,
+    schema::{Name, NamesRef, Namespace, RecordSchema, Schema},
+};
+
+const RECORD_FIELD_INIT_BUFFER_SIZE: usize = 64;
+const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024;
+const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY: usize = 32;
+const SINGLE_VALUE_INIT_BUFFER_SIZE: usize = 128;
+
+pub struct DirectSerializeSeq<'a, 's, W: Write> {
+    ser: &'a mut DirectSerializer<'s, W>,
+    item_schema: &'s Schema,
+    item_buffer_size: usize,
+    item_buffers: Vec<Vec<u8>>,
+    bytes_written: usize,
+}
+
+impl<'a, 's, W: Write> DirectSerializeSeq<'a, 's, W> {
+    fn new(
+        ser: &'a mut DirectSerializer<'s, W>,
+        item_schema: &'s Schema,
+        len: Option<usize>,
+    ) -> DirectSerializeSeq<'a, 's, W> {
+        DirectSerializeSeq {
+            ser,
+            item_schema,
+            item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
+            item_buffers: Vec::with_capacity(
+                
len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
+            ),
+            bytes_written: 0,
+        }
+    }
+
+    fn write_buffered_items(&mut self) -> Result<(), Error> {
+        if !self.item_buffers.is_empty() {
+            self.bytes_written +=
+                encode_long(self.item_buffers.len() as i64, &mut 
self.ser.writer)?;
+            for item in self.item_buffers.drain(..) {
+                self.bytes_written += self
+                    .ser
+                    .writer
+                    .write(item.as_slice())
+                    .map_err(Error::WriteBytes)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    fn serialize_element<T: ser::Serialize>(&mut self, value: &T) -> 
Result<(), Error> {
+        let mut item_buffer: Vec<u8> = 
Vec::with_capacity(self.item_buffer_size);
+        let mut item_ser = DirectSerializer::new(
+            &mut item_buffer,
+            self.item_schema,
+            self.ser.names,
+            self.ser.enclosing_namespace.clone(),
+        );
+        value.serialize(&mut item_ser)?;
+
+        self.item_buffer_size = std::cmp::max(self.item_buffer_size, 
item_buffer.len() + 16);
+
+        self.item_buffers.push(item_buffer);
+
+        if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
+            self.write_buffered_items()?;
+        }
+
+        Ok(())
+    }
+
+    fn end(mut self) -> Result<usize, Error> {
+        self.write_buffered_items()?;
+        self.bytes_written += 
self.ser.writer.write(&[0u8]).map_err(Error::WriteBytes)?;
+
+        Ok(self.bytes_written)
+    }
+}
+
+impl<W: Write> ser::SerializeSeq for DirectSerializeSeq<'_, '_, W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        self.serialize_element(&value)
+    }
+
+    fn end(self) -> Result<Self::Ok, Self::Error> {
+        self.end()
+    }
+}
+
+impl<W: Write> ser::SerializeTuple for DirectSerializeSeq<'_, '_, W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        ser::SerializeSeq::serialize_element(self, value)
+    }
+
+    fn end(self) -> Result<Self::Ok, Self::Error> {
+        ser::SerializeSeq::end(self)
+    }
+}
+
+pub struct DirectSerializeMap<'a, 's, W: Write> {
+    ser: &'a mut DirectSerializer<'s, W>,
+    item_schema: &'s Schema,
+    item_buffer_size: usize,
+    item_buffers: Vec<Vec<u8>>,
+    bytes_written: usize,
+}
+
+impl<'a, 's, W: Write> DirectSerializeMap<'a, 's, W> {
+    fn new(
+        ser: &'a mut DirectSerializer<'s, W>,
+        item_schema: &'s Schema,
+        len: Option<usize>,
+    ) -> DirectSerializeMap<'a, 's, W> {
+        DirectSerializeMap {
+            ser,
+            item_schema,
+            item_buffer_size: SINGLE_VALUE_INIT_BUFFER_SIZE,
+            item_buffers: Vec::with_capacity(
+                
len.unwrap_or(COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY),
+            ),
+            bytes_written: 0,
+        }
+    }
+
+    fn write_buffered_items(&mut self) -> Result<(), Error> {
+        if !self.item_buffers.is_empty() {
+            self.bytes_written +=
+                encode_long(self.item_buffers.len() as i64, &mut 
self.ser.writer)?;
+            for item in self.item_buffers.drain(..) {
+                self.bytes_written += self
+                    .ser
+                    .writer
+                    .write(item.as_slice())
+                    .map_err(Error::WriteBytes)?;
+            }
+        }
+
+        Ok(())
+    }
+}
+
+impl<W: Write> ser::SerializeMap for DirectSerializeMap<'_, '_, W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        let mut element_buffer: Vec<u8> = 
Vec::with_capacity(self.item_buffer_size);
+        let string_schema = Schema::String;
+        let mut key_ser = DirectSerializer::new(
+            &mut element_buffer,
+            &string_schema,
+            self.ser.names,
+            self.ser.enclosing_namespace.clone(),
+        );
+        key.serialize(&mut key_ser)?;
+
+        self.item_buffers.push(element_buffer);
+
+        Ok(())
+    }
+
+    fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        let last_index = self.item_buffers.len() - 1;
+        let element_buffer = &mut self.item_buffers[last_index];
+        let mut val_ser = DirectSerializer::new(
+            element_buffer,
+            self.item_schema,
+            self.ser.names,
+            self.ser.enclosing_namespace.clone(),
+        );
+        value.serialize(&mut val_ser)?;
+
+        self.item_buffer_size = std::cmp::max(self.item_buffer_size, 
element_buffer.len() + 16);
+
+        if self.item_buffers.len() > COLLECTION_SERIALIZER_ITEM_LIMIT {
+            self.write_buffered_items()?;
+        }
+
+        Ok(())
+    }
+
+    fn end(mut self) -> Result<Self::Ok, Self::Error> {
+        self.write_buffered_items()?;
+        self.bytes_written += 
self.ser.writer.write(&[0u8]).map_err(Error::WriteBytes)?;
+
+        Ok(self.bytes_written)
+    }
+}
+
+pub struct DirectSerializeStruct<'a, 's, W: Write> {
+    ser: &'a mut DirectSerializer<'s, W>,
+    record_schema: &'s RecordSchema,
+    item_count: usize,
+    buffered_fields: Vec<Option<Vec<u8>>>,
+    bytes_written: usize,
+}
+
+impl<'a, 's, W: Write> DirectSerializeStruct<'a, 's, W> {
+    fn new(
+        ser: &'a mut DirectSerializer<'s, W>,
+        record_schema: &'s RecordSchema,
+        len: usize,
+    ) -> DirectSerializeStruct<'a, 's, W> {
+        DirectSerializeStruct {
+            ser,
+            record_schema,
+            item_count: 0,
+            buffered_fields: vec![None; len],
+            bytes_written: 0,
+        }
+    }
+
+    fn serialize_next_field<T>(&mut self, value: &T) -> Result<(), Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        let next_field = self.record_schema.fields.get(self.item_count).expect(
+            "Validity of the next field index was expected to have been 
checked by the caller",
+        );
+
+        // If we receive fields in order, write them directly to the main 
writer
+        let mut value_ser = DirectSerializer::new(
+            &mut *self.ser.writer,
+            &next_field.schema,
+            self.ser.names,
+            self.ser.enclosing_namespace.clone(),
+        );
+        self.bytes_written += value.serialize(&mut value_ser)?;
+
+        self.item_count += 1;
+
+        // Write any buffered data to the stream that has now become next in 
line
+        while let Some(buffer) = self
+            .buffered_fields
+            .get_mut(self.item_count)
+            .and_then(|b| b.take())
+        {
+            self.bytes_written += self
+                .ser
+                .writer
+                .write(buffer.as_slice())
+                .map_err(Error::WriteBytes)?;
+            self.item_count += 1;
+        }
+
+        Ok(())
+    }
+
+    fn end(self) -> Result<usize, Error> {
+        if self.item_count != self.record_schema.fields.len() {
+            Err(Error::GetField(
+                self.record_schema.fields[self.item_count].name.clone(),
+            ))
+        } else {
+            Ok(self.bytes_written)
+        }
+    }
+}
+
+impl<W: Write> ser::SerializeStruct for DirectSerializeStruct<'_, '_, W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> 
Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        if self.item_count >= self.record_schema.fields.len() {
+            return Err(Error::FieldName(String::from(key)));
+        }
+
+        let next_field = &self.record_schema.fields[self.item_count];
+        let next_field_matches = match &next_field.aliases {
+            Some(aliases) => {
+                key == next_field.name.as_str() || aliases.iter().any(|a| key 
== a.as_str())
+            }
+            None => key == next_field.name.as_str(),
+        };
+
+        if next_field_matches {
+            self.serialize_next_field(&value).map_err(|e| {
+                Error::SerializeRecordFieldWithSchema {
+                    field_name: key,
+                    record_schema: Schema::Record(self.record_schema.clone()),
+                    error: Box::new(e),
+                }
+            })?;
+            Ok(())
+        } else {
+            if self.item_count < self.record_schema.fields.len() {
+                for i in self.item_count..self.record_schema.fields.len() {
+                    let field = &self.record_schema.fields[i];
+                    let field_matches = match &field.aliases {
+                        Some(aliases) => {
+                            key == field.name.as_str() || 
aliases.iter().any(|a| key == a.as_str())
+                        }
+                        None => key == field.name.as_str(),
+                    };
+
+                    if field_matches {
+                        let mut buffer: Vec<u8> = 
Vec::with_capacity(RECORD_FIELD_INIT_BUFFER_SIZE);
+                        let mut value_ser = DirectSerializer::new(
+                            &mut buffer,
+                            &field.schema,
+                            self.ser.names,
+                            self.ser.enclosing_namespace.clone(),
+                        );
+                        value.serialize(&mut value_ser).map_err(|e| {
+                            Error::SerializeRecordFieldWithSchema {
+                                field_name: key,
+                                record_schema: 
Schema::Record(self.record_schema.clone()),
+                                error: Box::new(e),
+                            }
+                        })?;
+
+                        self.buffered_fields[i] = Some(buffer);
+
+                        return Ok(());
+                    }
+                }
+            }
+
+            Err(Error::FieldName(String::from(key)))
+        }
+    }
+
+    fn end(self) -> Result<Self::Ok, Self::Error> {
+        self.end()
+    }
+}
+
+impl<W: Write> ser::SerializeStructVariant for DirectSerializeStruct<'_, '_, 
W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> 
Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        ser::SerializeStruct::serialize_field(self, key, value)
+    }
+
+    fn end(self) -> Result<Self::Ok, Self::Error> {
+        ser::SerializeStruct::end(self)
+    }
+}
+
+pub enum DirectSerializeTupleStruct<'a, 's, W: Write> {
+    Record(DirectSerializeStruct<'a, 's, W>),
+    Array(DirectSerializeSeq<'a, 's, W>),
+}
+
+impl<W: Write> DirectSerializeTupleStruct<'_, '_, W> {
+    fn serialize_field<T>(&mut self, value: &T) -> Result<(), Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        use DirectSerializeTupleStruct::*;
+        match self {
+            Record(record_ser) => record_ser.serialize_next_field(&value),
+            Array(array_ser) => array_ser.serialize_element(&value),
+        }
+    }
+
+    fn end(self) -> Result<usize, Error> {
+        use DirectSerializeTupleStruct::*;
+        match self {
+            Record(record_ser) => record_ser.end(),
+            Array(array_ser) => array_ser.end(),
+        }
+    }
+}
+
+impl<W: Write> ser::SerializeTupleStruct for DirectSerializeTupleStruct<'_, 
'_, W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        self.serialize_field(&value)
+    }
+
+    fn end(self) -> Result<Self::Ok, Self::Error> {
+        self.end()
+    }
+}
+
+impl<W: Write> ser::SerializeTupleVariant for DirectSerializeTupleStruct<'_, 
'_, W> {
+    type Ok = usize;
+    type Error = Error;
+
+    fn serialize_field<T>(&mut self, value: &T) -> Result<(), Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        self.serialize_field(&value)
+    }
+
+    fn end(self) -> Result<Self::Ok, Self::Error> {
+        self.end()
+    }
+}
+
+pub struct DirectSerializer<'s, W: Write> {
+    writer: &'s mut W,
+    root_schema: &'s Schema,
+    names: &'s NamesRef<'s>,
+    enclosing_namespace: Namespace,
+    schema_stack: Vec<&'s Schema>,
+}
+
+impl<'s, W: Write> DirectSerializer<'s, W> {
+    pub fn new(
+        writer: &'s mut W,
+        schema: &'s Schema,
+        names: &'s NamesRef<'s>,
+        enclosing_namespace: Namespace,
+    ) -> DirectSerializer<'s, W> {
+        DirectSerializer {
+            writer,
+            root_schema: schema,
+            names,
+            enclosing_namespace,
+            schema_stack: Vec::new(),
+        }
+    }
+
+    fn get_ref_schema(&self, name: &'s Name) -> Result<&'s Schema, Error> {
+        let full_name = match name.namespace {
+            Some(_) => Cow::Borrowed(name),
+            None => Cow::Owned(Name {
+                name: name.name.clone(),
+                namespace: self.enclosing_namespace.clone(),
+            }),
+        };
+
+        let ref_schema = self.names.get(full_name.as_ref()).copied();
+
+        ref_schema.ok_or_else(|| 
Error::SchemaResolutionError(full_name.as_ref().clone()))
+    }
+
+    fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, Error> {
+        let mut bytes_written: usize = 0;
+
+        bytes_written += encode_long(bytes.len() as i64, &mut self.writer)?;
+        bytes_written += self.writer.write(bytes).map_err(Error::WriteBytes)?;
+
+        Ok(bytes_written)
+    }
+}
+
+impl<'a, 's, W: Write> ser::Serializer for &'a mut DirectSerializer<'s, W> {
+    type Ok = usize;
+    type Error = Error;
+    type SerializeSeq = DirectSerializeSeq<'a, 's, W>;
+    type SerializeTuple = DirectSerializeSeq<'a, 's, W>;
+    type SerializeTupleStruct = DirectSerializeTupleStruct<'a, 's, W>;
+    type SerializeTupleVariant = DirectSerializeTupleStruct<'a, 's, W>;
+    type SerializeMap = DirectSerializeMap<'a, 's, W>;
+    type SerializeStruct = DirectSerializeStruct<'a, 's, W>;
+    type SerializeStructVariant = DirectSerializeStruct<'a, 's, W>;
+
+    fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "bool",
+            value: format!("{v}"),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Boolean => 
self.writer.write(&[u8::from(v)]).map_err(Error::WriteBytes),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Boolean => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_bool(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
+        self.serialize_i32(v as i32)
+    }
+
+    fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
+        self.serialize_i32(v as i32)
+    }
+
+    fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "int (i8 | i16 | i32)",
+            value: format!("{}", v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Int | Schema::TimeMillis | Schema::Date => encode_int(v, 
&mut self.writer),
+            Schema::Long
+            | Schema::TimeMicros
+            | Schema::TimestampMillis
+            | Schema::TimestampMicros
+            | Schema::TimestampNanos
+            | Schema::LocalTimestampMillis
+            | Schema::LocalTimestampMicros
+            | Schema::LocalTimestampNanos => encode_long(v as i64, &mut 
self.writer),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Int
+                        | Schema::TimeMillis
+                        | Schema::Date
+                        | Schema::Long
+                        | Schema::TimeMicros
+                        | Schema::TimestampMillis
+                        | Schema::TimestampMicros
+                        | Schema::TimestampNanos
+                        | Schema::LocalTimestampMillis
+                        | Schema::LocalTimestampMicros
+                        | Schema::LocalTimestampNanos => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_i32(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "i64",
+            value: format!("{}", v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Int | Schema::TimeMillis | Schema::Date => {
+                let int_value = i32::try_from(v).map_err(|_| create_error())?;
+                encode_int(int_value, &mut self.writer)
+            }
+            Schema::Long
+            | Schema::TimeMicros
+            | Schema::TimestampMillis
+            | Schema::TimestampMicros
+            | Schema::TimestampNanos
+            | Schema::LocalTimestampMillis
+            | Schema::LocalTimestampMicros
+            | Schema::LocalTimestampNanos => encode_long(v, &mut self.writer),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Int
+                        | Schema::TimeMillis
+                        | Schema::Date
+                        | Schema::Long
+                        | Schema::TimeMicros
+                        | Schema::TimestampMillis
+                        | Schema::TimestampMicros
+                        | Schema::TimestampNanos
+                        | Schema::LocalTimestampMillis
+                        | Schema::LocalTimestampMicros
+                        | Schema::LocalTimestampNanos => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_i64(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "u8",
+            value: format!("{}", v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Int | Schema::TimeMillis | Schema::Date => {
+                encode_int(v as i32, &mut self.writer)
+            }
+            Schema::Long
+            | Schema::TimeMicros
+            | Schema::TimestampMillis
+            | Schema::TimestampMicros
+            | Schema::TimestampNanos
+            | Schema::LocalTimestampMillis
+            | Schema::LocalTimestampMicros
+            | Schema::LocalTimestampNanos => encode_long(v as i64, &mut 
self.writer),
+            Schema::Bytes => self.write_bytes(&[v]),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Int
+                        | Schema::TimeMillis
+                        | Schema::Date
+                        | Schema::Long
+                        | Schema::TimeMicros
+                        | Schema::TimestampMillis
+                        | Schema::TimestampMicros
+                        | Schema::TimestampNanos
+                        | Schema::LocalTimestampMillis
+                        | Schema::LocalTimestampMicros
+                        | Schema::LocalTimestampNanos
+                        | Schema::Bytes => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_u8(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
+        self.serialize_u32(v as u32)
+    }
+
+    fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "unsigned int (u16 | u32)",
+            value: format!("{v}"),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Int | Schema::TimeMillis | Schema::Date => {
+                let int_value = i32::try_from(v).map_err(|_| create_error())?;
+                encode_int(int_value, &mut self.writer)
+            }
+            Schema::Long
+            | Schema::TimeMicros
+            | Schema::TimestampMillis
+            | Schema::TimestampMicros
+            | Schema::TimestampNanos
+            | Schema::LocalTimestampMillis
+            | Schema::LocalTimestampMicros
+            | Schema::LocalTimestampNanos => encode_long(v as i64, &mut 
self.writer),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Int
+                        | Schema::TimeMillis
+                        | Schema::Date
+                        | Schema::Long
+                        | Schema::TimeMicros
+                        | Schema::TimestampMillis
+                        | Schema::TimestampMicros
+                        | Schema::TimestampNanos
+                        | Schema::LocalTimestampMillis
+                        | Schema::LocalTimestampMicros
+                        | Schema::LocalTimestampNanos => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_u32(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "u64",
+            value: format!("{}", v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Int | Schema::TimeMillis | Schema::Date => {
+                let int_value = i32::try_from(v).map_err(|_| create_error())?;
+                encode_int(int_value, &mut self.writer)
+            }
+            Schema::Long
+            | Schema::TimeMicros
+            | Schema::TimestampMillis
+            | Schema::TimestampMicros
+            | Schema::TimestampNanos
+            | Schema::LocalTimestampMillis
+            | Schema::LocalTimestampMicros
+            | Schema::LocalTimestampNanos => {
+                let long_value = i64::try_from(v).map_err(|_| create_error())?;
+                encode_long(long_value, &mut self.writer)
+            }
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Int
+                        | Schema::TimeMillis
+                        | Schema::Date
+                        | Schema::Long
+                        | Schema::TimeMicros
+                        | Schema::TimestampMillis
+                        | Schema::TimestampMicros
+                        | Schema::TimestampNanos
+                        | Schema::LocalTimestampMillis
+                        | Schema::LocalTimestampMicros
+                        | Schema::LocalTimestampNanos => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_u64(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "f32",
+            value: format!("{}", v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Float => self
+                .writer
+                .write(&v.to_le_bytes())
+                .map_err(Error::WriteBytes),
+            Schema::Double => self
+                .writer
+                .write(&(v as f64).to_le_bytes())
+                .map_err(Error::WriteBytes),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Float | Schema::Double => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_f32(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "f64",
+            value: format!("{}", v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Float => self
+                .writer
+                .write(&(v as f32).to_le_bytes())
+                .map_err(Error::WriteBytes),
+            Schema::Double => self
+                .writer
+                .write(&v.to_le_bytes())
+                .map_err(Error::WriteBytes),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Float | Schema::Double => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_f64(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "char",
+            value: String::from(v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::String | Schema::Bytes => 
self.write_bytes(String::from(v).as_bytes()),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::String | Schema::Bytes => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_char(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "string",
+            value: String::from(v),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::String | Schema::Bytes | Schema::Uuid => 
self.write_bytes(v.as_bytes()),
+            Schema::Fixed(sch) => {
+                if v.len() == sch.size {
+                    self.writer.write(v.as_bytes()).map_err(Error::WriteBytes)
+                } else {
+                    Err(create_error())
+                }
+            }
+            Schema::Ref { name } => {
+                let ref_schema = self.get_ref_schema(name)?;
+                self.schema_stack.push(ref_schema);
+                self.serialize_str(v)
+            }
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::String
+                        | Schema::Bytes
+                        | Schema::Uuid
+                        | Schema::Fixed(_)
+                        | Schema::Ref { name: _ } => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_str(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || {
+            use std::fmt::Write;
+
+            let mut v_str = String::with_capacity(v.len());
+            for b in v {
+                if write!(&mut v_str, "{:x}", b).is_err() {
+                    v_str.push_str("??");
+                }
+            }
+            Error::SerializeValueWithSchema {
+                value_type: "bytes",
+                value: v_str,
+                schema: schema.clone(),
+            }
+        };
+
+        match schema {
+            Schema::String | Schema::Bytes | Schema::Uuid | Schema::BigDecimal 
=> {
+                self.write_bytes(v)
+            }
+            Schema::Fixed(sch) => {
+                if v.len() == sch.size {
+                    self.writer.write(v).map_err(Error::WriteBytes)
+                } else {
+                    Err(create_error())
+                }
+            }
+            Schema::Duration => {
+                if v.len() == 12 {
+                    self.writer.write(v).map_err(Error::WriteBytes)
+                } else {
+                    Err(create_error())
+                }
+            }
+            Schema::Decimal(sch) => match sch.inner.as_ref() {
+                Schema::Bytes => self.write_bytes(v),
+                Schema::Fixed(fixed_sch) => match 
fixed_sch.size.checked_sub(v.len()) {
+                    Some(pad) => {
+                        let pad_val = match v.len() {
+                            0 => 0,
+                            _ => v[0],
+                        };
+                        let padding = vec![pad_val; pad];
+                        self.writer
+                            .write(padding.as_slice())
+                            .map_err(Error::WriteBytes)?;
+                        self.writer.write(v).map_err(Error::WriteBytes)
+                    }
+                    None => Err(Error::CompareFixedSizes {
+                        size: fixed_sch.size,
+                        n: v.len(),
+                    }),
+                },
+                _ => Err(create_error()),
+            },
+            Schema::Ref { name } => {
+                let ref_schema = self.get_ref_schema(name)?;
+                self.schema_stack.push(ref_schema);
+                self.serialize_bytes(v)
+            }
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::String
+                        | Schema::Bytes
+                        | Schema::Uuid
+                        | Schema::BigDecimal
+                        | Schema::Fixed(_)
+                        | Schema::Duration
+                        | Schema::Decimal(_)
+                        | Schema::Ref { name: _ } => {
+                            encode_int(i as i32, &mut *self.writer)?;
+                            self.schema_stack.push(variant_sch);
+                            return self.serialize_bytes(v);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_none(self) -> Result<Self::Ok, Self::Error> {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "none",
+            value: String::from("None"),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Null => Ok(0),
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Null => {
+                            return encode_int(i as i32, &mut *self.writer);
+                        }
+                        _ => { /* skip */ }
+                    }
+                }
+                Err(create_error())
+            }
+            _ => Err(create_error()),
+        }
+    }
+
+    fn serialize_some<T>(self, value: &T) -> Result<Self::Ok, Self::Error>
+    where
+        T: ?Sized + ser::Serialize,
+    {
+        let schema = self.schema_stack.pop().unwrap_or(self.root_schema);
+
+        let create_error = || Error::SerializeValueWithSchema {
+            value_type: "some",
+            value: String::from("Some(?)"),
+            schema: schema.clone(),
+        };
+
+        match schema {
+            Schema::Union(sch) => {
+                for (i, variant_sch) in sch.schemas.iter().enumerate() {
+                    match variant_sch {
+                        Schema::Null => { /* skip */ }

Review Comment:
   Hm.
   Is there a test case for this ?
   Shouldn't we err instead ?!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to