jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2251938878
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +165,175 @@ impl Decoder {
///
/// Returns the number of bytes consumed.
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+ if self.writer_schema_store.is_some()
+ && data.len() >= SINGLE_OBJECT_MAGIC.len()
+ && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+ {
+ return Err(ArrowError::ParseError(
+ "Expected single‑object encoding fingerprint prefix for first
message \
+ (writer_schema_store is set but active_fingerprint is None)"
+ .into(),
+ ));
+ }
let mut total_consumed = 0usize;
- while total_consumed < data.len() && self.decoded_rows <
self.batch_size {
- let consumed = self.record_decoder.decode(&data[total_consumed..],
1)?;
- // A successful call to record_decoder.decode means one row was
decoded.
- // If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
- // We increment `decoded_rows` to mark progress and avoid an
infinite loop.
- // We add `consumed` (which can be 0) to `total_consumed`.
- total_consumed += consumed;
- self.decoded_rows += 1;
+ let hash_type = self.writer_schema_store.as_ref().map_or(
+ FingerprintAlgorithm::Rabin,
+ SchemaStore::fingerprint_algorithm,
+ );
+ // The loop stops when the batch is full, a schema change is staged,
+ // or handle_prefix indicates we need more bytes (Some(0)).
+ while total_consumed < data.len() && self.remaining_capacity > 0 {
+ if let Some(n) = self.handle_prefix(&data[total_consumed..],
hash_type)? {
+ // We either consumed a prefix (n > 0) and need a schema
switch, or we need
+ // more bytes to make a decision. Either way, this decoding
attempt is finished.
+ total_consumed += n;
+ }
+ // No prefix: decode one row and keep going.
+ let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+ self.remaining_capacity -= 1;
+ total_consumed += n;
}
Ok(total_consumed)
}
+ // Attempt to handle a single‑object‑encoding prefix at the current
position.
+ //
+ // * Ok(None) – buffer does not start with the prefix.
+ // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller
should await more bytes.
+ // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and
fingerprint).
+ fn handle_prefix(
+ &mut self,
+ buf: &[u8],
+ hash_type: FingerprintAlgorithm,
+ ) -> Result<Option<usize>, ArrowError> {
+ // If there is no schema store, prefixes are unrecognized.
+ if self.writer_schema_store.is_none() {
+ return Ok(None); // Continue to decode the next record
+ }
+ // Need at least the magic bytes to decide (2 bytes).
+ let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+ return Ok(Some(0)); // Get more bytes
+ };
+ // Bail out early if the magic does not match.
+ if magic_bytes != SINGLE_OBJECT_MAGIC {
+ return Ok(None); // Continue to decode the next record
+ }
+ // Try to parse the fingerprint that follows the magic.
+ let fingerprint_size = match hash_type {
+ FingerprintAlgorithm::Rabin => self
+ .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes|
{
+ Fingerprint::Rabin(u64::from_le_bytes(bytes))
+ })?,
+ };
+ // Convert the inner result into a “bytes consumed” count.
+ let consumed = match fingerprint_size {
+ Some(n) => n + SINGLE_OBJECT_MAGIC.len(), // magic + fingerprint
+ None => 0, // incomplete fingerprint
+ };
+ Ok(Some(consumed))
+ }
+
+ // Attempts to read and install a new fingerprint of `N` bytes.
+ //
+ // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+ // * Ok(Some(N)) – fingerprint consumed (always `N`).
+ fn handle_fingerprint<const N: usize>(
+ &mut self,
+ buf: &[u8],
+ fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+ ) -> Result<Option<usize>, ArrowError> {
+ // Need enough bytes to get fingerprint (next N bytes)
+ let Some(fingerprint_bytes) = buf.get(..N) else {
+ return Ok(None); // Get more bytes
+ };
+ // SAFETY: length checked above.
+ let new_fingerprint =
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+ // If the fingerprint indicates a schema change, prepare to switch
decoders.
+ if self.active_fingerprint != Some(new_fingerprint) {
+ #[cfg(feature = "lru")]
+ let new_decoder = match self.cache.pop(&new_fingerprint) {
+ Some(decoder) => decoder,
+ None => self.create_decoder_for(new_fingerprint)?,
+ };
+ #[cfg(not(feature = "lru"))]
+ let new_decoder = match self.cache.shift_remove(&new_fingerprint) {
+ Some(decoder) => decoder,
+ None => self.create_decoder_for(new_fingerprint)?,
+ };
+ self.pending_schema = Some((new_fingerprint, new_decoder));
+ // If there are already decoded rows, we must flush them first.
+ // Reducing `remaining_capacity` to 0 ensures `flush` is called
next.
+ if self.remaining_capacity < self.batch_size {
+ self.remaining_capacity = 0;
+ }
+ }
+ Ok(Some(N))
+ }
+
+ fn create_decoder_for(
+ &mut self,
+ new_fingerprint: Fingerprint,
+ ) -> Result<RecordDecoder, ArrowError> {
+ let writer_schema_store = self
+ .writer_schema_store
+ .as_ref()
+ .ok_or_else(|| ArrowError::ParseError("Schema store
unavailable".into()))?;
+ let writer_schema = writer_schema_store
+ .lookup(&new_fingerprint)
+ .ok_or_else(|| {
+ ArrowError::ParseError(format!("Unknown fingerprint:
{new_fingerprint:?}"))
+ })?;
+ let Some(ref reader_schema) = self.reader_schema else {
Review Comment:
> Asking because it seems like most file read paths ignore the read schema?
The file read path doesn't change `RecordDecoder` during decoding, which is
probably why it seems like that.
Alot of the complexity comes down to using the same `decode` method for both
file and raw reads. I'm going to iterate on that in my next PR. It should help
quite a bit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]