jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2251862329


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,34 +369,91 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let root = match reader_schema {
+            Some(reader_schema) if !compare_schemas(writer_schema, 
reader_schema)? => {
+                
AvroFieldBuilder::new(writer_schema).with_reader_schema(reader_schema)
+            }
+            _ => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode)
+        .build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
+    }
+
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        reader_schema: Option<AvroSchema<'static>>,
+        writer_schema_store: Option<SchemaStore<'static>>,
+    ) -> Decoder {
+        #[cfg(feature = "lru")]
+        let capacity = 
NonZeroUsize::new(self.decoder_cache_size).unwrap_or(NonZeroUsize::MIN); // 
NonZeroUsize::MIN is 1
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            #[cfg(feature = "lru")]
+            cache: LruCache::new(capacity),
+            #[cfg(not(feature = "lru"))]
+            cache: IndexMap::new(),
+            max_cache_size: self.decoder_cache_size,
+            reader_schema,
+            utf8_view: self.utf8_view,
+            writer_schema_store,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        if let Some(hdr) = header {
+            let writer_schema = hdr
                 .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".into())
+                })?;
+            let record_decoder =
+                self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
+            return Ok(self.make_decoder_with_parts(record_decoder, None, None, 
None));
+        }
+        let writer_schema_store = 
self.writer_schema_store.as_ref().ok_or_else(|| {
+            ArrowError::ParseError("Writer schema store required for raw 
Avro".into())
+        })?;
+        let fingerprint = self
+            .active_fingerprint
+            .or_else(|| writer_schema_store.fingerprints().into_iter().next())
+            .ok_or_else(|| {
+                ArrowError::ParseError(
+                    "Writer schema store must contain at least one 
schema".into(),
+                )
             })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+        let writer_schema = 
writer_schema_store.lookup(&fingerprint).ok_or_else(|| {
+            ArrowError::ParseError("Active fingerprint not found in schema 
store".into())
+        })?;
+        let record_decoder =
+            self.make_record_decoder(writer_schema, 
self.reader_schema.as_ref())?;
+        Ok(self.make_decoder_with_parts(
+            record_decoder,
+            Some(fingerprint),
+            self.reader_schema.clone(),
+            self.writer_schema_store.clone(),

Review Comment:
   > But what about the reader schema? What's the story behind the smell there?
   
   What's happening here is the file read path won't ever switch 
`RecordDecoder` during decoding and therefore doesn't require a `reader_schema` 
to be pushed down into the `Decoder`. Basically the initial `RecordDecoder` 
used for the file read path is the only one needed, i.e.
   
   ```rust
               let record_decoder =
                   self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to