jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2238381876
########## arrow-avro/src/reader/mod.rs: ########## @@ -171,11 +192,101 @@ impl Decoder { /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { if self.decoded_rows == 0 { - Ok(None) + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.decoded_rows = 0; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + if self.cache.contains_key(&old_fingerprint) { + self.cache.shift_remove(&old_fingerprint); + } + self.cache.insert(old_fingerprint, old_decoder); + } else { + self.active_decoder = new_decoder; + } + } + self.evict_cache(); + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.schema_store.is_none() + || self.static_store_mode + || !buf.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fp) { + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.decoded_rows > 0 { + self.decoded_rows = self.batch_size; + } + } + Ok(Some(full_len)) + } + + fn prepare_schema_switch(&mut self, new_fingerprint: Fingerprint) -> Result<(), ArrowError> { + let new_decoder = if let Some(decoder) = self.cache.shift_remove(&new_fingerprint) { + decoder } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + // No cached decoder, create a new one + let store = self + .schema_store + .as_ref() + .ok_or_else(|| ArrowError::ParseError("Schema store unavailable".into()))?; + let writer_schema = store.lookup(&new_fingerprint).ok_or_else(|| { + ArrowError::ParseError(format!("Unknown fingerprint: {new_fingerprint:?}")) + })?; + let reader_schema = self.reader_schema.clone().ok_or_else(|| { + ArrowError::ParseError("Reader schema unavailable for resolution".into()) + })?; + let resolved = AvroField::resolve_from_writer_and_reader( + writer_schema, + &reader_schema, + self.utf8_view, + self.strict_mode, + )?; + RecordDecoder::try_new_with_options(resolved.data_type(), self.utf8_view)? + }; + // Stage the new decoder and fingerprint to be activated after the next flush + self.pending_schema = Some((new_fingerprint, new_decoder)); + Ok(()) + } + + #[inline] + fn evict_cache(&mut self) { + while self.cache.len() > self.max_cache_size { + if let Some(lru_key) = self.cache.keys().next().cloned() { + self.cache.shift_remove(&lru_key); Review Comment: Pushed this change up in my latest commit. That was a good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org