scovich commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2255515847


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -220,34 +320,119 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder(
+        &self,
+        writer_schema: &Schema,
+        reader_schema: Option<&AvroSchema>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let root = match reader_schema {
+            Some(reader_schema) => {
+                
AvroFieldBuilder::new(writer_schema).with_reader_schema(reader_schema.clone())
+            }
+            _ => AvroFieldBuilder::new(writer_schema),
+        }

Review Comment:
   nit:
   ```suggestion
           let mut builder = AvroFieldBuilder::new(writer_schema);
           if let Some(reader_schema) = reader_schema {
               builder = builder.with_reader_schema(reader_schema.clone());
           }
           let root = builder
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -158,39 +162,131 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.expect_prefix
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, 
ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if !self.expect_prefix {
+            return Ok(None);
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match self.fingerprint_algorithm {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        // NOTE: Incomplete fingerprint consumes no bytes.
+        let consumed = fingerprint_size.map_or(0, |n| n + 
SINGLE_OBJECT_MAGIC.len());
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Insufficient bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            let new_decoder = self.cache.shift_remove(&new_fingerprint);
+            let new_decoder = match new_decoder {
+                Some(decoder) => decoder,
+                None => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Unknown fingerprint: {new_fingerprint:?}"
+                    )))
+                }
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply any staged schema switch.
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                {
+                    self.cache.shift_remove(&old_fingerprint);
+                    self.cache.insert(old_fingerprint, old_decoder);
+                }

Review Comment:
   Leftovers from some earlier draft?
   ```suggestion
                   self.cache.shift_remove(&old_fingerprint);
                   self.cache.insert(old_fingerprint, old_decoder);
   ```



##########
arrow-avro/src/reader/mod.rs:
##########
@@ -158,39 +162,131 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.expect_prefix
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, 
ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if !self.expect_prefix {
+            return Ok(None);
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match self.fingerprint_algorithm {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        // NOTE: Incomplete fingerprint consumes no bytes.
+        let consumed = fingerprint_size.map_or(0, |n| n + 
SINGLE_OBJECT_MAGIC.len());
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Insufficient bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            let new_decoder = self.cache.shift_remove(&new_fingerprint);
+            let new_decoder = match new_decoder {
+                Some(decoder) => decoder,
+                None => {
+                    return Err(ArrowError::ParseError(format!(
+                        "Unknown fingerprint: {new_fingerprint:?}"
+                    )))
+                }
+            };

Review Comment:
   ```suggestion
               let Some(new_decoder) = 
self.cache.shift_remove(&new_fingerprint) else {
                   return Err(ArrowError::ParseError(format!(
                       "Unknown fingerprint: {new_fingerprint:?}"
                   )));
               };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to