scovich commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2236614910
########## arrow-avro/src/reader/mod.rs: ########## @@ -171,11 +192,101 @@ impl Decoder { /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { if self.decoded_rows == 0 { - Ok(None) + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.decoded_rows = 0; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + if self.cache.contains_key(&old_fingerprint) { + self.cache.shift_remove(&old_fingerprint); + } + self.cache.insert(old_fingerprint, old_decoder); Review Comment: Can do very clean/simple eviction logic here, after the insert: ```suggestion self.cache.insert(old_fingerprint, old_decoder); if self.cache.len() > self.max_cache_size { self.cache.shift_remove_index(0); } ``` ########## arrow-avro/src/reader/mod.rs: ########## @@ -171,11 +192,101 @@ impl Decoder { /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { if self.decoded_rows == 0 { - Ok(None) + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.decoded_rows = 0; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + if self.cache.contains_key(&old_fingerprint) { + self.cache.shift_remove(&old_fingerprint); + } + self.cache.insert(old_fingerprint, old_decoder); + } else { + self.active_decoder = new_decoder; + } + } + self.evict_cache(); + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.schema_store.is_none() + || self.static_store_mode + || !buf.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fp) { + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.decoded_rows > 0 { + self.decoded_rows = self.batch_size; + } + } + Ok(Some(full_len)) + } + + fn prepare_schema_switch(&mut self, new_fingerprint: Fingerprint) -> Result<(), ArrowError> { + let new_decoder = if let Some(decoder) = self.cache.shift_remove(&new_fingerprint) { + decoder } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + // No cached decoder, create a new one + let store = self + .schema_store + .as_ref() + .ok_or_else(|| ArrowError::ParseError("Schema store unavailable".into()))?; + let writer_schema = store.lookup(&new_fingerprint).ok_or_else(|| { + ArrowError::ParseError(format!("Unknown fingerprint: {new_fingerprint:?}")) + })?; + let reader_schema = self.reader_schema.clone().ok_or_else(|| { + ArrowError::ParseError("Reader schema unavailable for resolution".into()) + })?; + let resolved = AvroField::resolve_from_writer_and_reader( + writer_schema, + &reader_schema, + self.utf8_view, + self.strict_mode, + )?; + RecordDecoder::try_new_with_options(resolved.data_type(), self.utf8_view)? + }; + // Stage the new decoder and fingerprint to be activated after the next flush + self.pending_schema = Some((new_fingerprint, new_decoder)); + Ok(()) + } + + #[inline] + fn evict_cache(&mut self) { + while self.cache.len() > self.max_cache_size { + if let Some(lru_key) = self.cache.keys().next().cloned() { + self.cache.shift_remove(&lru_key); Review Comment: Actually... looking at the code, there is only one call site for this method, and there will be at most one extra entry to remove. We should probably just bake that in at the call site, instead of splitting the logic up like this? ########## arrow-avro/src/reader/mod.rs: ########## @@ -171,11 +192,101 @@ impl Decoder { /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { if self.decoded_rows == 0 { - Ok(None) + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.decoded_rows = 0; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + if self.cache.contains_key(&old_fingerprint) { + self.cache.shift_remove(&old_fingerprint); + } Review Comment: I'm pretty sure a "failed" `shift_remove` will have _exactly_ the same cost as a `contains_key` check, so we can just blindly call it: ```suggestion self.cache.shift_remove(&old_fingerprint); ``` ########## arrow-avro/src/reader/mod.rs: ########## @@ -171,11 +192,101 @@ impl Decoder { /// `Ok(None)` if no new rows are available. pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { if self.decoded_rows == 0 { - Ok(None) + return Ok(None); + } + let batch = self.active_decoder.flush()?; + self.decoded_rows = 0; + // Apply a pending schema switch if one is staged + if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() { + // Cache the old decoder before replacing it + if let Some(old_fingerprint) = self.active_fingerprint.replace(new_fingerprint) { + let old_decoder = std::mem::replace(&mut self.active_decoder, new_decoder); + if self.cache.contains_key(&old_fingerprint) { + self.cache.shift_remove(&old_fingerprint); + } + self.cache.insert(old_fingerprint, old_decoder); + } else { + self.active_decoder = new_decoder; + } + } + self.evict_cache(); + Ok(Some(batch)) + } + + #[inline] + fn handle_prefix( + &mut self, + buf: &[u8], + hash_type: FingerprintAlgorithm, + ) -> Result<Option<usize>, ArrowError> { + if self.schema_store.is_none() + || self.static_store_mode + || !buf.starts_with(&SINGLE_OBJECT_MAGIC) + { + return Ok(None); + } + let full_len = prefix_len(hash_type); + if buf.len() < full_len { + return Ok(Some(0)); // Not enough data to read the full fingerprint + } + let fp_bytes = &buf[2..full_len]; + let new_fp = match hash_type { + FingerprintAlgorithm::Rabin => { + let Ok(bytes) = <[u8; 8]>::try_from(fp_bytes) else { + return Err(ArrowError::ParseError(format!( + "Invalid Rabin fingerprint length, expected 8, got {}", + fp_bytes.len() + ))); + }; + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + } + }; + // If the fingerprint indicates a schema change, prepare to switch decoders. + if self.active_fingerprint != Some(new_fp) { + self.prepare_schema_switch(new_fp)?; + // If there are already decoded rows, we must flush them first. + // Forcing the batch to be full ensures `flush` is called next. + if self.decoded_rows > 0 { + self.decoded_rows = self.batch_size; + } + } + Ok(Some(full_len)) + } + + fn prepare_schema_switch(&mut self, new_fingerprint: Fingerprint) -> Result<(), ArrowError> { + let new_decoder = if let Some(decoder) = self.cache.shift_remove(&new_fingerprint) { + decoder } else { - let batch = self.record_decoder.flush()?; - self.decoded_rows = 0; - Ok(Some(batch)) + // No cached decoder, create a new one + let store = self + .schema_store + .as_ref() + .ok_or_else(|| ArrowError::ParseError("Schema store unavailable".into()))?; + let writer_schema = store.lookup(&new_fingerprint).ok_or_else(|| { + ArrowError::ParseError(format!("Unknown fingerprint: {new_fingerprint:?}")) + })?; + let reader_schema = self.reader_schema.clone().ok_or_else(|| { + ArrowError::ParseError("Reader schema unavailable for resolution".into()) + })?; + let resolved = AvroField::resolve_from_writer_and_reader( + writer_schema, + &reader_schema, + self.utf8_view, + self.strict_mode, + )?; + RecordDecoder::try_new_with_options(resolved.data_type(), self.utf8_view)? + }; + // Stage the new decoder and fingerprint to be activated after the next flush + self.pending_schema = Some((new_fingerprint, new_decoder)); + Ok(()) + } + + #[inline] + fn evict_cache(&mut self) { + while self.cache.len() > self.max_cache_size { + if let Some(lru_key) = self.cache.keys().next().cloned() { + self.cache.shift_remove(&lru_key); Review Comment: This will pay quadratic work for a cache with a lot of extra entries. Hopefully that's a rare case tho? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org