This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch avro-3405-metadata-in-file in repository https://gitbox.apache.org/repos/asf/avro.git
commit dff38f4256ff4afa9a0d12c0d8f8a428d01eff39 Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Wed Feb 16 10:54:24 2022 +0200 AVRO-3405 Add API to read/write user metadata in .avro files Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- lang/rust/src/error.rs | 3 +++ lang/rust/src/reader.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++++++ lang/rust/src/writer.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+) diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs index d687eea..14a0672 100644 --- a/lang/rust/src/error.rs +++ b/lang/rust/src/error.rs @@ -376,6 +376,9 @@ pub enum Error { /// Error while resolving Schema::Ref #[error("Unresolved schema reference: {0}")] SchemaResolutionError(String), + + #[error("The file metadata is already flushed.")] + FileHeaderAlreadyWritten, } impl serde::ser::Error for Error { diff --git a/lang/rust/src/reader.rs b/lang/rust/src/reader.rs index d46b3bd..76ded2e 100644 --- a/lang/rust/src/reader.rs +++ b/lang/rust/src/reader.rs @@ -19,6 +19,7 @@ use crate::{decode::decode, schema::Schema, types::Value, util, AvroResult, Codec, Error}; use serde_json::from_slice; use std::{ + collections::HashMap, io::{ErrorKind, Read}, str::FromStr, }; @@ -35,6 +36,7 @@ struct Block<R> { marker: [u8; 16], codec: Codec, writer_schema: Schema, + metadata: HashMap<String, String>, } impl<R: Read> Block<R> { @@ -47,6 +49,7 @@ impl<R: Read> Block<R> { buf_idx: 0, message_count: 0, marker: [0; 16], + metadata: HashMap::default(), }; block.read_header()?; @@ -94,6 +97,38 @@ impl<R: Read> Block<R> { { self.codec = codec; } + + if let Some(metadata) = + meta.get("avro.user_metadata") + .and_then(|metadata| match *metadata { + Value::Bytes(ref bytes) => { + match decode( + &Schema::Map(Box::new(Schema::String)), + &mut bytes.as_slice(), + ) { + Ok(Value::Map(ref map)) => Some(map.clone()), + _ => { + warn!("Failed to parse user metadata"); + None + } + } + } + _ => None, + }) + { + self.metadata = metadata + .iter() + .map(|(k, v)| match v { + Value::String(s) => (k.clone(), s.clone()), + unknown => { + warn!("User metadata values must be strings, found {:?}", unknown); + (k.clone(), format!("{:?}", unknown)) + } + }) + .collect(); + } else { + debug!("No user metadata found in the file."); + } } else { return Err(Error::GetHeaderMetadata); } @@ -251,6 +286,10 @@ impl<'a, R: Read> Reader<'a, R> { self.reader_schema } + pub fn user_metadata(&self) -> &HashMap<String, String> { + &self.block.metadata + } + #[inline] fn read_next(&mut self) -> AvroResult<Option<Value>> { let read_schema = if self.should_resolve_schema { @@ -499,4 +538,32 @@ mod tests { assert!(value.is_err()); } } + + #[test] + fn test_avro_3405_read_user_metadata_success() { + use crate::writer::Writer; + + let schema = Schema::parse_str(SCHEMA).unwrap(); + let mut writer = Writer::new(&schema, Vec::new()); + + let mut user_meta_data = HashMap::new(); + user_meta_data.insert("key1".to_string(), "value1".to_string()); + user_meta_data.insert("key2".to_string(), "value2".to_string()); + + for (k, v) in user_meta_data.iter() { + writer.add_meta_data(k.to_string(), v.to_string()).unwrap(); + } + + let mut record = Record::new(&schema).unwrap(); + record.put("a", 27i64); + record.put("b", "foo"); + + writer.append(record.clone()).unwrap(); + writer.append(record.clone()).unwrap(); + writer.flush().unwrap(); + let result = writer.into_inner().unwrap(); + + let reader = Reader::new(&result[..]).unwrap(); + assert_eq!(reader.user_metadata(), &user_meta_data); + } } diff --git a/lang/rust/src/writer.rs b/lang/rust/src/writer.rs index ca08fe0..fab725f 100644 --- a/lang/rust/src/writer.rs +++ b/lang/rust/src/writer.rs @@ -49,6 +49,8 @@ pub struct Writer<'a, W> { marker: Vec<u8>, #[builder(default = false, setter(skip))] has_header: bool, + #[builder(default, setter(skip))] + metadata: HashMap<String, Value>, } impl<'a, W: Write> Writer<'a, W> { @@ -272,6 +274,17 @@ impl<'a, W: Write> Writer<'a, W> { self.writer.write(bytes).map_err(Error::WriteBytes) } + /// Adds custom metadata to the file. + /// This method could be used only before adding the first record to the writer. + pub fn add_meta_data(&mut self, key: String, value: String) -> AvroResult<()> { + if !self.has_header { + self.metadata.insert(key, Value::String(value)); + Ok(()) + } else { + Err(Error::FileHeaderAlreadyWritten) + } + } + /// Create an Avro header based on schema, codec and sync marker. fn header(&self) -> Result<Vec<u8>, Error> { let schema_bytes = serde_json::to_string(self.schema) @@ -282,6 +295,16 @@ impl<'a, W: Write> Writer<'a, W> { metadata.insert("avro.schema", Value::Bytes(schema_bytes)); metadata.insert("avro.codec", self.codec.into()); + if !self.metadata.is_empty() { + let mut buf = Vec::new(); + encode( + &Value::Map(self.metadata.clone()), + &Schema::Map(Box::new(Schema::String)), + &mut buf, + ); + metadata.insert("avro.user_metadata", Value::Bytes(buf)); + } + let mut header = Vec::new(); header.extend_from_slice(AVRO_OBJECT_HEADER); encode( @@ -293,6 +316,7 @@ impl<'a, W: Write> Writer<'a, W> { Ok(header) } + fn maybe_write_header(&mut self) -> AvroResult<usize> { if !self.has_header { let header = self.header()?; @@ -808,4 +832,46 @@ mod tests { data.as_slice() ); } + + #[test] + fn test_avro_3405_writer_add_metadata_success() { + let schema = Schema::parse_str(SCHEMA).unwrap(); + let mut writer = Writer::new(&schema, Vec::new()); + + writer + .add_meta_data("key1".to_string(), "value1".to_string()) + .unwrap(); + writer + .add_meta_data("key2".to_string(), "value2".to_string()) + .unwrap(); + + let mut record = Record::new(&schema).unwrap(); + record.put("a", 27i64); + record.put("b", "foo"); + + writer.append(record.clone()).unwrap(); + writer.append(record.clone()).unwrap(); + writer.flush().unwrap(); + let result = writer.into_inner().unwrap(); + + assert_eq!(result.len(), 237); + } + + #[test] + fn test_avro_3405_writer_add_metadata_failure() { + let schema = Schema::parse_str(SCHEMA).unwrap(); + let mut writer = Writer::new(&schema, Vec::new()); + + let mut record = Record::new(&schema).unwrap(); + record.put("a", 27i64); + record.put("b", "foo"); + writer.append(record.clone()).unwrap(); + + if writer + .add_meta_data("stringKey".to_string(), "value2".into()) + .is_ok() + { + panic!("Expected error that metadata cannot be added after adding data"); + } + } }
