jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2234588681
########## arrow-avro/src/reader/mod.rs: ########## @@ -116,88 +129,295 @@ fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> { break; } } - decoder.flush().ok_or_else(|| { - ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string()) - }) + decoder + .flush() + .ok_or_else(|| ArrowError::ParseError("Unexpected EOF while reading Avro header".into())) } /// A low-level interface for decoding Avro-encoded bytes into Arrow `RecordBatch`. +/// +/// This decoder handles both standard Avro container file data and single-object encoded +/// messages by managing schema resolution and caching decoders. #[derive(Debug)] pub struct Decoder { - record_decoder: RecordDecoder, + /// The maximum number of rows to decode into a single batch. batch_size: usize, + /// The number of rows decoded into the current batch. decoded_rows: usize, + /// The fingerprint of the active writer schema. + active_fp: Option<Fingerprint>, + /// The `RecordDecoder` corresponding to the active writer schema. + active_decoder: RecordDecoder, + /// An LRU cache of inactive `RecordDecoder`s, keyed by schema fingerprint. + cache: HashMap<Fingerprint, RecordDecoder>, + /// A queue to maintain the least recently used order of the cache. + lru: VecDeque<Fingerprint>, + /// Maximum number of cached decoders allowed. + max_cache_size: usize, + /// The user-provided reader schema for projection. + reader_schema: Option<AvroSchema<'static>>, + /// A store of known writer schemas for single-object decoding. + schema_store: Option<SchemaStore<'static>>, + /// Whether to decode string data as `StringViewArray`. + utf8_view: bool, + /// If true, do not allow resolving schemas not already in the `SchemaStore`. + static_store_mode: bool, + /// If true, schema resolution errors will cause a failure. + strict_mode: bool, + /// The fingerprint of a schema to switch to after the current batch is flushed. + pending_fp: Option<Fingerprint>, + /// A `RecordDecoder` for a new schema, staged to become active after the current batch. + pending_decoder: Option<RecordDecoder>, } impl Decoder { - fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self { - Self { - record_decoder, - batch_size, - decoded_rows: 0, - } - } - - /// Return the Arrow schema for the rows decoded by this decoder + /// Return the Arrow schema for the rows decoded by this decoder. + #[inline] pub fn schema(&self) -> SchemaRef { - self.record_decoder.schema().clone() + self.active_decoder.schema().clone() } - /// Return the configured maximum number of rows per batch + /// Return the configured maximum number of rows per batch. + #[inline] pub fn batch_size(&self) -> usize { self.batch_size } - /// Feed `data` into the decoder row by row until we either: - /// - consume all bytes in `data`, or - /// - reach `batch_size` decoded rows. + /// Returns the number of rows that can be added to this decoder before it is full. + #[inline] + pub fn capacity(&self) -> usize { + self.batch_size.saturating_sub(self.decoded_rows) + } + + /// Returns true if the decoder has reached its capacity for the current batch. + #[inline] + pub fn batch_is_full(&self) -> bool { + self.capacity() == 0 + } + + /// Decodes a slice of Avro data, populating an internal batch of records. /// - /// Returns the number of bytes consumed. + /// This method consumes bytes from the `data` slice and decodes them into + /// Arrow records. Decoding continues until either all bytes in `data` have + /// been processed, or the internal batch reaches its configured `batch_size`. + /// + /// # Schema Evolution and Single-Object Encoding + /// + /// If a `SchemaStore` is available, the decoder supports Avro's single-object + /// encoding format, which allows for schema changes within the data stream. + /// It looks for a `0xC301` magic byte sequence, which indicates that a schema + /// fingerprint follows. When found, the decoder switches to the new schema from + /// the store for later records. + /// + /// This dynamic schema detection can be disabled by enabling `static_store_mode`. + /// In this mode, the check for the magic bytes is skipped, avoiding a 2-byte + /// lookahead at the start of each record and thus improving performance. This is + /// useful when it's known that the writer's schema will not change. + /// + /// # Return Value + /// + /// On success, returns `Ok(usize)` with the number of bytes consumed from `data`. + /// + /// A return value of `0` indicates that more data is needed to decode the next + /// record or schema prefix. If the returned value is greater than `0` but less + /// than `data.len()`, it means decoding stopped because either the batch became + /// full or the end of the input slice was reached mid-record. The caller can + /// call `decode` again with the unprocessed portion of the slice. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { - let mut total_consumed = 0usize; + let mut total_consumed = 0; + let hash_type = self.schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let prefix_bytes = if self.schema_store.is_some() + && !self.static_store_mode + && data[total_consumed..].starts_with(&SINGLE_OBJECT_MAGIC) + { + self.handle_prefix(&data[total_consumed..], hash_type)? + } else { + None + }; + match prefix_bytes { + Some(0) => break, + Some(n) => { + total_consumed += n; + continue; + } + None => { + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + if n == 0 { + break; + } Review Comment: > I wonder if the decoder needs to return a Result<Option<usize>> where: > > Err(...) indicates decoding somehow failed (ie invalid bytes encountered) > return the error > Ok(None) means there were not enough bytes available to decode the next record > break > Ok(Some(n)) means an n-byte record was decoded (n=0 indicates a zero-byte record) > continue I actually like this a lot. I'm still finalizing some things on the `RecorderDecoder` side and I'd like to explore it further as a part of that work if that's ok with you? I'll can make sure to link this comment to any PR that contains that change. Either way this code shouldn't have made it into this PR and has been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org