jecsand838 commented on code in PR #8006:
URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2234569451


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -216,73 +440,227 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fp` = None
+    /// - `static_store_mode` = false
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
-            .with_utf8view(self.utf8_view)
-            .with_strict_mode(self.strict_mode)
-            .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+    /// Sets the maximum number of rows to include in each `RecordBatch`.
+    ///
+    /// Defaults to `1024`.
+    pub fn with_batch_size(mut self, n: usize) -> Self {
+        self.batch_size = n;
+        self
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
-                .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
-            })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+    /// Configures the reader to decode string data into `StringViewArray`.
+    ///
+    /// When enabled, string data is decoded into `StringViewArray` instead of 
`StringArray`.
+    /// This can improve performance for strings that are frequently accessed.
+    ///
+    /// Defaults to `false`.
+    pub fn with_utf8_view(mut self, enabled: bool) -> Self {
+        self.utf8_view = enabled;
+        self
+    }
+
+    /// Get whether StringViewArray is enabled for string data
+    pub fn use_utf8view(&self) -> bool {
+        self.utf8_view
+    }
+
+    /// Enables or disables strict schema resolution mode.
+    ///
+    /// When enabled (`true`), an error is returned if a field in the writer's 
schema
+    /// cannot be resolved to a field in the reader's schema. When disabled 
(`false`),
+    /// any unresolvable fields are simply skipped during decoding.
+    ///
+    /// Defaults to `false`.
+    pub fn with_strict_mode(mut self, enabled: bool) -> Self {
+        self.strict_mode = enabled;
+        self
     }
 
-    /// Sets the row-based batch size
-    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
-        self.batch_size = batch_size;
+    /// Sets the reader's Avro schema, which the decoded data will be 
projected into.
+    ///
+    /// If a reader schema is provided, the decoder will perform schema 
resolution,
+    /// converting data from the writer's schema (read from the file or schema 
store)
+    /// to the specified reader schema. If not set, the writer's schema is 
used.
+    ///
+    /// Defaults to `None`.
+    pub fn with_reader_schema(mut self, s: AvroSchema<'static>) -> Self {
+        self.reader_schema = Some(s);
         self
     }
 
-    /// Set whether to use StringViewArray for string data
+    /// Sets the `SchemaStore` used for resolving writer schemas.
     ///
-    /// When enabled, string data from Avro files will be loaded into
-    /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
-        self.utf8_view = utf8_view;
+    /// This is necessary when decoding single-object encoded data that 
identifies
+    /// schemas by a fingerprint. The store allows the decoder to look up the
+    /// full writer schema from a fingerprint embedded in the data.
+    ///
+    /// Defaults to `None`.
+    pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> 
Self {
+        self.writer_schema_store = Some(store);
         self
     }
 
-    /// Get whether StringViewArray is enabled for string data
-    pub fn use_utf8view(&self) -> bool {
-        self.utf8_view
+    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
+    ///
+    /// This is useful when the data stream does not begin with a schema 
definition
+    /// or fingerprint, allowing the decoder to start with a known schema from 
the
+    /// `SchemaStore`.
+    ///
+    /// Defaults to `None`.
+    pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+        self.active_fp = Some(fp);
+        self
     }
 
-    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
-    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
-        self.strict_mode = strict_mode;
+    /// If `true`, all schemas must be pre-registered in the `SchemaStore`.
+    ///
+    /// When this mode is enabled, decoding will fail if a schema fingerprint 
is
+    /// encountered that does not already exist in the store. This prevents the
+    /// dynamic resolution of schemas and ensures that only known schemas are 
used.
+    ///
+    /// Defaults to `false`.
+    pub fn with_static_store_mode(mut self, enabled: bool) -> Self {
+        self.static_store_mode = enabled;
         self
     }
 
-    /// Sets the Avro schema.
+    /// Set the maximum number of decoders to cache.
     ///
-    /// If a schema is not provided, the schema will be read from the Avro 
file header.
-    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
-        self.schema = Some(schema);
+    /// When dealing with Avro files that contain multiple schemas, we may 
need to switch
+    /// between different decoders. This cache avoids rebuilding them from 
scratch every time.
+    ///
+    /// Defaults to `20`.
+    pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self {
+        self.decoder_cache_size = n;
         self
     }
 
-    /// Create a [`Reader`] from this builder and a `BufRead`
+    fn validate(&self) -> Result<(), ArrowError> {
+        match (
+            self.writer_schema_store.as_ref(),
+            self.reader_schema.as_ref(),
+            self.active_fp.as_ref(),
+            self.static_store_mode,
+        ) {
+            (Some(_), None, _, _) => Err(ArrowError::ParseError(
+                "Reader schema must be set when writer schema store is 
provided".into(),
+            )),
+            (None, _, Some(_), _) => Err(ArrowError::ParseError(
+                "Active fingerprint requires a writer schema store".into(),
+            )),
+            (None, _, _, true) => Err(ArrowError::ParseError(
+                "static_store_mode=true requires a writer schema store".into(),
+            )),
+            (Some(_), _, None, true) => Err(ArrowError::ParseError(
+                "static_store_mode=true requires an active fingerprint".into(),
+            )),
+            _ => Ok(()),
+        }
+    }
+
+    fn make_record_decoder<'a>(
+        &self,
+        writer_schema: &AvroSchema<'a>,
+        reader_schema: Option<&AvroSchema<'a>>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let field_builder = match reader_schema {
+            Some(rs) if !compare_schemas(writer_schema, rs) => {
+                AvroFieldBuilder::new(writer_schema).with_reader_schema(rs)
+            }
+            Some(rs) => AvroFieldBuilder::new(rs),
+            None => AvroFieldBuilder::new(writer_schema),
+        }
+        .with_utf8view(self.utf8_view)
+        .with_strict_mode(self.strict_mode);
+        let root = field_builder.build()?;
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
+    }
+
+    fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, 
ArrowError> {
+        match header {
+            Some(hdr) => {
+                let writer_schema = hdr
+                    .schema()
+                    .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                    .ok_or_else(|| {
+                        ArrowError::ParseError("No Avro schema present in file 
header".into())
+                    })?;
+                let record_decoder =
+                    self.make_record_decoder(&writer_schema, 
self.reader_schema.as_ref())?;
+                Ok(Decoder {
+                    batch_size: self.batch_size,
+                    decoded_rows: 0,
+                    active_fp: None, // Not used for container files
+                    active_decoder: record_decoder,
+                    cache: HashMap::new(),
+                    lru: VecDeque::new(),
+                    max_cache_size: self.decoder_cache_size,
+                    reader_schema: None,
+                    utf8_view: self.utf8_view,
+                    schema_store: None,
+                    static_store_mode: true, // The writer schema is in the 
container file header
+                    strict_mode: self.strict_mode,
+                    pending_fp: None,
+                    pending_decoder: None,
+                })
+            }
+            None => {
+                let reader_schema = self.reader_schema.clone().ok_or_else(|| {
+                    ArrowError::ParseError("Reader schema required for raw 
Avro".into())
+                })?;
+                let (init_fp, initial_decoder) = match 
(&self.writer_schema_store, self.active_fp) {
+                    // An initial fingerprint is provided, use it to look up 
the first schema.
+                    (Some(schema_store), Some(fp)) => {
+                        let writer_schema = 
schema_store.lookup(&fp).ok_or_else(|| {
+                            ArrowError::ParseError(
+                                "Active fingerprint not found in schema 
store".into(),
+                            )
+                        })?;
+                        let dec = self.make_record_decoder(&writer_schema, 
Some(&reader_schema))?;
+                        (Some(fp), dec)
+                    }
+                    // No initial fingerprint; the first record must contain 
one.
+                    // A temporary decoder is created from the reader schema.
+                    _ => {
+                        let dec = self.make_record_decoder(&reader_schema, 
None)?;
+                        (None, dec)
+                    }
+                };
+                Ok(Decoder {
+                    batch_size: self.batch_size,
+                    decoded_rows: 0,
+                    active_fp: init_fp,
+                    active_decoder: initial_decoder,
+                    cache: HashMap::new(),
+                    lru: VecDeque::new(),

Review Comment:
   That was a good call out. I included this abstraction in my latest commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to