jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2249116924


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -154,39 +158,178 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.active_fingerprint.is_none()
+            && self.writer_schema_store.is_some()
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        let hash_type = self.writer_schema_store.as_ref().map_or(
+            FingerprintAlgorithm::Rabin,
+            SchemaStore::fingerprint_algorithm,
+        );
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            match self.handle_prefix(&data[total_consumed..], hash_type)? {
+                None => {
+                    // No prefix: decode one row.
+                    let n = 
self.active_decoder.decode(&data[total_consumed..], 1)?;
+                    total_consumed += n;
+                    self.remaining_capacity -= 1;
+                }
+                Some(0) => {
+                    // Detected start of a prefix but need more bytes.
+                    break;
+                }
+                Some(n) => {
+                    // Consumed a complete prefix (n > 0). Stage flush and 
stop.
+                    total_consumed += n;
+                    break;
+                }
+            }
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(
+        &mut self,
+        buf: &[u8],
+        hash_type: FingerprintAlgorithm,
+    ) -> Result<Option<usize>, ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if self.writer_schema_store.is_none() {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.

Review Comment:
   Unit tests covering these corner cases were included in my latest push.
   
   I also improved the `test_decode_stream_with_schema` test to check that the 
prefix parsing works correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to