jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2234569451
########## arrow-avro/src/reader/mod.rs: ########## @@ -216,73 +440,227 @@ impl ReaderBuilder { /// - `batch_size` = 1024 /// - `strict_mode` = false /// - `utf8_view` = false - /// - `schema` = None + /// - `reader_schema` = None + /// - `writer_schema_store` = None + /// - `active_fp` = None + /// - `static_store_mode` = false pub fn new() -> Self { Self::default() } - fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> { - let root_field = AvroFieldBuilder::new(schema) - .with_utf8view(self.utf8_view) - .with_strict_mode(self.strict_mode) - .build()?; - RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) + /// Sets the maximum number of rows to include in each `RecordBatch`. + /// + /// Defaults to `1024`. + pub fn with_batch_size(mut self, n: usize) -> Self { + self.batch_size = n; + self } - fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { - let header = read_header(reader)?; - let record_decoder = if let Some(schema) = &self.schema { - self.make_record_decoder(schema)? - } else { - let avro_schema: Option<AvroSchema<'_>> = header - .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - let avro_schema = avro_schema.ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".to_string()) - })?; - self.make_record_decoder(&avro_schema)? - }; - let decoder = Decoder::new(record_decoder, self.batch_size); - Ok((header, decoder)) + /// Configures the reader to decode string data into `StringViewArray`. + /// + /// When enabled, string data is decoded into `StringViewArray` instead of `StringArray`. + /// This can improve performance for strings that are frequently accessed. + /// + /// Defaults to `false`. + pub fn with_utf8_view(mut self, enabled: bool) -> Self { + self.utf8_view = enabled; + self + } + + /// Get whether StringViewArray is enabled for string data + pub fn use_utf8view(&self) -> bool { + self.utf8_view + } + + /// Enables or disables strict schema resolution mode. + /// + /// When enabled (`true`), an error is returned if a field in the writer's schema + /// cannot be resolved to a field in the reader's schema. When disabled (`false`), + /// any unresolvable fields are simply skipped during decoding. + /// + /// Defaults to `false`. + pub fn with_strict_mode(mut self, enabled: bool) -> Self { + self.strict_mode = enabled; + self } - /// Sets the row-based batch size - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; + /// Sets the reader's Avro schema, which the decoded data will be projected into. + /// + /// If a reader schema is provided, the decoder will perform schema resolution, + /// converting data from the writer's schema (read from the file or schema store) + /// to the specified reader schema. If not set, the writer's schema is used. + /// + /// Defaults to `None`. + pub fn with_reader_schema(mut self, s: AvroSchema<'static>) -> Self { + self.reader_schema = Some(s); self } - /// Set whether to use StringViewArray for string data + /// Sets the `SchemaStore` used for resolving writer schemas. /// - /// When enabled, string data from Avro files will be loaded into - /// Arrow's StringViewArray instead of the standard StringArray. - pub fn with_utf8_view(mut self, utf8_view: bool) -> Self { - self.utf8_view = utf8_view; + /// This is necessary when decoding single-object encoded data that identifies + /// schemas by a fingerprint. The store allows the decoder to look up the + /// full writer schema from a fingerprint embedded in the data. + /// + /// Defaults to `None`. + pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> Self { + self.writer_schema_store = Some(store); self } - /// Get whether StringViewArray is enabled for string data - pub fn use_utf8view(&self) -> bool { - self.utf8_view + /// Sets the initial schema fingerprint for decoding single-object encoded data. + /// + /// This is useful when the data stream does not begin with a schema definition + /// or fingerprint, allowing the decoder to start with a known schema from the + /// `SchemaStore`. + /// + /// Defaults to `None`. + pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self { + self.active_fp = Some(fp); + self } - /// Controls whether certain Avro unions of the form `[T, "null"]` should produce an error. - pub fn with_strict_mode(mut self, strict_mode: bool) -> Self { - self.strict_mode = strict_mode; + /// If `true`, all schemas must be pre-registered in the `SchemaStore`. + /// + /// When this mode is enabled, decoding will fail if a schema fingerprint is + /// encountered that does not already exist in the store. This prevents the + /// dynamic resolution of schemas and ensures that only known schemas are used. + /// + /// Defaults to `false`. + pub fn with_static_store_mode(mut self, enabled: bool) -> Self { + self.static_store_mode = enabled; self } - /// Sets the Avro schema. + /// Set the maximum number of decoders to cache. /// - /// If a schema is not provided, the schema will be read from the Avro file header. - pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self { - self.schema = Some(schema); + /// When dealing with Avro files that contain multiple schemas, we may need to switch + /// between different decoders. This cache avoids rebuilding them from scratch every time. + /// + /// Defaults to `20`. + pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self { + self.decoder_cache_size = n; self } - /// Create a [`Reader`] from this builder and a `BufRead` + fn validate(&self) -> Result<(), ArrowError> { + match ( + self.writer_schema_store.as_ref(), + self.reader_schema.as_ref(), + self.active_fp.as_ref(), + self.static_store_mode, + ) { + (Some(_), None, _, _) => Err(ArrowError::ParseError( + "Reader schema must be set when writer schema store is provided".into(), + )), + (None, _, Some(_), _) => Err(ArrowError::ParseError( + "Active fingerprint requires a writer schema store".into(), + )), + (None, _, _, true) => Err(ArrowError::ParseError( + "static_store_mode=true requires a writer schema store".into(), + )), + (Some(_), _, None, true) => Err(ArrowError::ParseError( + "static_store_mode=true requires an active fingerprint".into(), + )), + _ => Ok(()), + } + } + + fn make_record_decoder<'a>( + &self, + writer_schema: &AvroSchema<'a>, + reader_schema: Option<&AvroSchema<'a>>, + ) -> Result<RecordDecoder, ArrowError> { + let field_builder = match reader_schema { + Some(rs) if !compare_schemas(writer_schema, rs) => { + AvroFieldBuilder::new(writer_schema).with_reader_schema(rs) + } + Some(rs) => AvroFieldBuilder::new(rs), + None => AvroFieldBuilder::new(writer_schema), + } + .with_utf8view(self.utf8_view) + .with_strict_mode(self.strict_mode); + let root = field_builder.build()?; + RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view) + } + + fn make_decoder(&self, header: Option<&Header>) -> Result<Decoder, ArrowError> { + match header { + Some(hdr) => { + let writer_schema = hdr + .schema() + .map_err(|e| ArrowError::ExternalError(Box::new(e)))? + .ok_or_else(|| { + ArrowError::ParseError("No Avro schema present in file header".into()) + })?; + let record_decoder = + self.make_record_decoder(&writer_schema, self.reader_schema.as_ref())?; + Ok(Decoder { + batch_size: self.batch_size, + decoded_rows: 0, + active_fp: None, // Not used for container files + active_decoder: record_decoder, + cache: HashMap::new(), + lru: VecDeque::new(), + max_cache_size: self.decoder_cache_size, + reader_schema: None, + utf8_view: self.utf8_view, + schema_store: None, + static_store_mode: true, // The writer schema is in the container file header + strict_mode: self.strict_mode, + pending_fp: None, + pending_decoder: None, + }) + } + None => { + let reader_schema = self.reader_schema.clone().ok_or_else(|| { + ArrowError::ParseError("Reader schema required for raw Avro".into()) + })?; + let (init_fp, initial_decoder) = match (&self.writer_schema_store, self.active_fp) { + // An initial fingerprint is provided, use it to look up the first schema. + (Some(schema_store), Some(fp)) => { + let writer_schema = schema_store.lookup(&fp).ok_or_else(|| { + ArrowError::ParseError( + "Active fingerprint not found in schema store".into(), + ) + })?; + let dec = self.make_record_decoder(&writer_schema, Some(&reader_schema))?; + (Some(fp), dec) + } + // No initial fingerprint; the first record must contain one. + // A temporary decoder is created from the reader schema. + _ => { + let dec = self.make_record_decoder(&reader_schema, None)?; + (None, dec) + } + }; + Ok(Decoder { + batch_size: self.batch_size, + decoded_rows: 0, + active_fp: init_fp, + active_decoder: initial_decoder, + cache: HashMap::new(), + lru: VecDeque::new(), Review Comment: That was a good call out. I included this abstraction in my latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org