jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2238325798
########## arrow-avro/src/reader/mod.rs: ########## @@ -116,88 +129,295 @@ fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> { break; } } - decoder.flush().ok_or_else(|| { - ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string()) - }) + decoder + .flush() + .ok_or_else(|| ArrowError::ParseError("Unexpected EOF while reading Avro header".into())) } /// A low-level interface for decoding Avro-encoded bytes into Arrow `RecordBatch`. +/// +/// This decoder handles both standard Avro container file data and single-object encoded +/// messages by managing schema resolution and caching decoders. #[derive(Debug)] pub struct Decoder { - record_decoder: RecordDecoder, + /// The maximum number of rows to decode into a single batch. batch_size: usize, + /// The number of rows decoded into the current batch. decoded_rows: usize, + /// The fingerprint of the active writer schema. + active_fp: Option<Fingerprint>, + /// The `RecordDecoder` corresponding to the active writer schema. + active_decoder: RecordDecoder, + /// An LRU cache of inactive `RecordDecoder`s, keyed by schema fingerprint. + cache: HashMap<Fingerprint, RecordDecoder>, + /// A queue to maintain the least recently used order of the cache. + lru: VecDeque<Fingerprint>, + /// Maximum number of cached decoders allowed. + max_cache_size: usize, + /// The user-provided reader schema for projection. + reader_schema: Option<AvroSchema<'static>>, + /// A store of known writer schemas for single-object decoding. + schema_store: Option<SchemaStore<'static>>, + /// Whether to decode string data as `StringViewArray`. + utf8_view: bool, + /// If true, do not allow resolving schemas not already in the `SchemaStore`. + static_store_mode: bool, + /// If true, schema resolution errors will cause a failure. + strict_mode: bool, + /// The fingerprint of a schema to switch to after the current batch is flushed. + pending_fp: Option<Fingerprint>, + /// A `RecordDecoder` for a new schema, staged to become active after the current batch. + pending_decoder: Option<RecordDecoder>, } impl Decoder { - fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self { - Self { - record_decoder, - batch_size, - decoded_rows: 0, - } - } - - /// Return the Arrow schema for the rows decoded by this decoder + /// Return the Arrow schema for the rows decoded by this decoder. + #[inline] pub fn schema(&self) -> SchemaRef { - self.record_decoder.schema().clone() + self.active_decoder.schema().clone() } - /// Return the configured maximum number of rows per batch + /// Return the configured maximum number of rows per batch. + #[inline] pub fn batch_size(&self) -> usize { self.batch_size } - /// Feed `data` into the decoder row by row until we either: - /// - consume all bytes in `data`, or - /// - reach `batch_size` decoded rows. + /// Returns the number of rows that can be added to this decoder before it is full. + #[inline] + pub fn capacity(&self) -> usize { + self.batch_size.saturating_sub(self.decoded_rows) + } + + /// Returns true if the decoder has reached its capacity for the current batch. + #[inline] + pub fn batch_is_full(&self) -> bool { + self.capacity() == 0 + } + + /// Decodes a slice of Avro data, populating an internal batch of records. /// - /// Returns the number of bytes consumed. + /// This method consumes bytes from the `data` slice and decodes them into + /// Arrow records. Decoding continues until either all bytes in `data` have + /// been processed, or the internal batch reaches its configured `batch_size`. + /// + /// # Schema Evolution and Single-Object Encoding + /// + /// If a `SchemaStore` is available, the decoder supports Avro's single-object + /// encoding format, which allows for schema changes within the data stream. + /// It looks for a `0xC301` magic byte sequence, which indicates that a schema + /// fingerprint follows. When found, the decoder switches to the new schema from + /// the store for later records. + /// + /// This dynamic schema detection can be disabled by enabling `static_store_mode`. + /// In this mode, the check for the magic bytes is skipped, avoiding a 2-byte + /// lookahead at the start of each record and thus improving performance. This is + /// useful when it's known that the writer's schema will not change. + /// + /// # Return Value + /// + /// On success, returns `Ok(usize)` with the number of bytes consumed from `data`. + /// + /// A return value of `0` indicates that more data is needed to decode the next + /// record or schema prefix. If the returned value is greater than `0` but less + /// than `data.len()`, it means decoding stopped because either the batch became + /// full or the end of the input slice was reached mid-record. The caller can + /// call `decode` again with the unprocessed portion of the slice. pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { - let mut total_consumed = 0usize; + let mut total_consumed = 0; + let hash_type = self.schema_store.as_ref().map_or( + FingerprintAlgorithm::Rabin, + SchemaStore::fingerprint_algorithm, + ); while total_consumed < data.len() && self.decoded_rows < self.batch_size { - let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; - // A successful call to record_decoder.decode means one row was decoded. - // If `consumed` is 0 on a non-empty buffer, it implies a valid zero-byte record. - // We increment `decoded_rows` to mark progress and avoid an infinite loop. - // We add `consumed` (which can be 0) to `total_consumed`. - total_consumed += consumed; - self.decoded_rows += 1; + let prefix_bytes = if self.schema_store.is_some() + && !self.static_store_mode + && data[total_consumed..].starts_with(&SINGLE_OBJECT_MAGIC) + { + self.handle_prefix(&data[total_consumed..], hash_type)? + } else { + None + }; + match prefix_bytes { + Some(0) => break, + Some(n) => { + total_consumed += n; + continue; + } + None => { + let n = self.active_decoder.decode(&data[total_consumed..], 1)?; + if n == 0 { + break; + } + total_consumed += n; + self.decoded_rows += 1; + } + } } Ok(total_consumed) } - /// Produce a `RecordBatch` if at least one row is fully decoded, returning - /// `Ok(None)` if no new rows are available. + /// Flushes any fully decoded rows into a `RecordBatch`. + /// + /// This method should be called after one or more successful calls to `decode`. + /// It collects all rows decoded since the last flush and creates a `RecordBatch` from them. + /// + /// After the batch is created, the internal count of decoded rows is reset. + /// + /// ## Schema Switching and Caching + /// + /// If a pending schema switch was scheduled (i.e., by encountering a new schema fingerprint), + /// it is applied after the current batch is created. This ensures that each `RecordBatch` + /// has a single, consistent schema. The old decoder is moved to a cache for potential reuse, + /// and the new decoder becomes active for later `decode` calls. The cache is managed + /// with a least recently used (LRU) eviction policy. + /// + /// ## Returns + /// + /// - `Ok(Some(RecordBatch))` if there were decoded rows to flush. + /// - `Ok(None)` if no rows have been decoded since the last flush operation. + /// - `Err(ArrowError)` if an inconsistent state is detected during a schema switch (i.e., a + /// pending decoder is present without a corresponding fingerprint). pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { if self.decoded_rows == 0 { - Ok(None) - } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + return Ok(None); } + let batch = self.active_decoder.flush()?; + self.decoded_rows = 0; + // Apply a pending schema switch if one is staged + match (self.pending_decoder.take(), self.pending_fp.take()) { + (Some(new_dec), Some(new_fp)) => { + // Cache the old decoder before replacing it + if let Some(old_fp) = self.active_fp.replace(new_fp) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_dec); + self.cache.insert(old_fp, old_decoder); + self.touch_cache_key(old_fp); + } else { + self.active_decoder = new_dec; + } + } + (Some(_), None) => { + return Err(ArrowError::InvalidArgumentError( + "Inconsistent state: pending decoder without a pending fingerprint".into(), + )); + } + (None, _) => {} + } + self.evict_cache(); + Ok(Some(batch)) } - /// Returns the number of rows that can be added to this decoder before it is full. - pub fn capacity(&self) -> usize { - self.batch_size.saturating_sub(self.decoded_rows) + #[inline] + fn touch_cache_key(&mut self, fp: Fingerprint) { + if let Some(pos) = self.lru.iter().position(|&k| k == fp) { + self.lru.remove(pos); + } + self.lru.push_back(fp); } - /// Returns true if the decoder has reached its capacity for the current batch. - pub fn batch_is_full(&self) -> bool { - self.capacity() == 0 + #[inline] + fn evict_cache(&mut self) { + while self.lru.len() > self.max_cache_size { + if let Some(lru) = self.lru.pop_front() { + self.cache.remove(&lru); + } + } + } + + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fp != Some(new_fp) { + if self.static_store_mode && self.active_fp.is_some() { + return Err(ArrowError::ParseError( + "Schema fingerprint changed in static_store_mode".into(), + )); + } + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.decoded_rows > 0 { + self.decoded_rows = self.batch_size; Review Comment: I went ahead and pushed up the first approach you mentioned, i.e. decrementing `capacity`. Logically it makes more sense to go that way, that was a good suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org