jecsand838 commented on code in PR #8006: URL: https://github.com/apache/arrow-rs/pull/8006#discussion_r2234663358
########## arrow-avro/src/reader/mod.rs: ########## @@ -216,73 +440,227 @@ impl ReaderBuilder { /// - `batch_size` = 1024 /// - `strict_mode` = false /// - `utf8_view` = false - /// - `schema` = None + /// - `reader_schema` = None + /// - `writer_schema_store` = None + /// - `active_fp` = None + /// - `static_store_mode` = false pub fn new() -> Self { Self::default() } - fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> { - let root_field = AvroFieldBuilder::new(schema) - .with_utf8view(self.utf8_view) - .with_strict_mode(self.strict_mode) - .build()?; - RecordDecoder::try_new_with_options(root_field.data_type(), self.utf8_view) + /// Sets the maximum number of rows to include in each `RecordBatch`. + /// + /// Defaults to `1024`. + pub fn with_batch_size(mut self, n: usize) -> Self { + self.batch_size = n; + self } - fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { - let header = read_header(reader)?; - let record_decoder = if let Some(schema) = &self.schema { - self.make_record_decoder(schema)? - } else { - let avro_schema: Option<AvroSchema<'_>> = header - .schema() - .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; - let avro_schema = avro_schema.ok_or_else(|| { - ArrowError::ParseError("No Avro schema present in file header".to_string()) - })?; - self.make_record_decoder(&avro_schema)? - }; - let decoder = Decoder::new(record_decoder, self.batch_size); - Ok((header, decoder)) + /// Configures the reader to decode string data into `StringViewArray`. + /// + /// When enabled, string data is decoded into `StringViewArray` instead of `StringArray`. + /// This can improve performance for strings that are frequently accessed. + /// + /// Defaults to `false`. + pub fn with_utf8_view(mut self, enabled: bool) -> Self { + self.utf8_view = enabled; + self + } + + /// Get whether StringViewArray is enabled for string data + pub fn use_utf8view(&self) -> bool { + self.utf8_view + } + + /// Enables or disables strict schema resolution mode. + /// + /// When enabled (`true`), an error is returned if a field in the writer's schema + /// cannot be resolved to a field in the reader's schema. When disabled (`false`), + /// any unresolvable fields are simply skipped during decoding. + /// + /// Defaults to `false`. + pub fn with_strict_mode(mut self, enabled: bool) -> Self { + self.strict_mode = enabled; + self } - /// Sets the row-based batch size - pub fn with_batch_size(mut self, batch_size: usize) -> Self { - self.batch_size = batch_size; + /// Sets the reader's Avro schema, which the decoded data will be projected into. + /// + /// If a reader schema is provided, the decoder will perform schema resolution, + /// converting data from the writer's schema (read from the file or schema store) + /// to the specified reader schema. If not set, the writer's schema is used. + /// + /// Defaults to `None`. + pub fn with_reader_schema(mut self, s: AvroSchema<'static>) -> Self { + self.reader_schema = Some(s); self } - /// Set whether to use StringViewArray for string data + /// Sets the `SchemaStore` used for resolving writer schemas. /// - /// When enabled, string data from Avro files will be loaded into - /// Arrow's StringViewArray instead of the standard StringArray. - pub fn with_utf8_view(mut self, utf8_view: bool) -> Self { - self.utf8_view = utf8_view; + /// This is necessary when decoding single-object encoded data that identifies + /// schemas by a fingerprint. The store allows the decoder to look up the + /// full writer schema from a fingerprint embedded in the data. + /// + /// Defaults to `None`. + pub fn with_writer_schema_store(mut self, store: SchemaStore<'static>) -> Self { + self.writer_schema_store = Some(store); self } - /// Get whether StringViewArray is enabled for string data - pub fn use_utf8view(&self) -> bool { - self.utf8_view + /// Sets the initial schema fingerprint for decoding single-object encoded data. + /// + /// This is useful when the data stream does not begin with a schema definition + /// or fingerprint, allowing the decoder to start with a known schema from the + /// `SchemaStore`. + /// + /// Defaults to `None`. + pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self { + self.active_fp = Some(fp); + self } - /// Controls whether certain Avro unions of the form `[T, "null"]` should produce an error. - pub fn with_strict_mode(mut self, strict_mode: bool) -> Self { - self.strict_mode = strict_mode; + /// If `true`, all schemas must be pre-registered in the `SchemaStore`. + /// + /// When this mode is enabled, decoding will fail if a schema fingerprint is + /// encountered that does not already exist in the store. This prevents the + /// dynamic resolution of schemas and ensures that only known schemas are used. + /// + /// Defaults to `false`. + pub fn with_static_store_mode(mut self, enabled: bool) -> Self { + self.static_store_mode = enabled; self } - /// Sets the Avro schema. + /// Set the maximum number of decoders to cache. /// - /// If a schema is not provided, the schema will be read from the Avro file header. - pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self { - self.schema = Some(schema); + /// When dealing with Avro files that contain multiple schemas, we may need to switch + /// between different decoders. This cache avoids rebuilding them from scratch every time. + /// + /// Defaults to `20`. + pub fn with_max_decoder_cache_size(mut self, n: usize) -> Self { + self.decoder_cache_size = n; self } - /// Create a [`Reader`] from this builder and a `BufRead` + fn validate(&self) -> Result<(), ArrowError> { + match ( + self.writer_schema_store.as_ref(), + self.reader_schema.as_ref(), + self.active_fp.as_ref(), + self.static_store_mode, + ) { + (Some(_), None, _, _) => Err(ArrowError::ParseError( + "Reader schema must be set when writer schema store is provided".into(), + )), + (None, _, Some(_), _) => Err(ArrowError::ParseError( + "Active fingerprint requires a writer schema store".into(), + )), + (None, _, _, true) => Err(ArrowError::ParseError( + "static_store_mode=true requires a writer schema store".into(), + )), + (Some(_), _, None, true) => Err(ArrowError::ParseError( Review Comment: > All these invalid combos make me wonder if we have the wrong set of builder methods? Could a more constrained set of methods make it easier for callers to get it right? Honestly I had this thought as well. It's tricky because of the performance impacts caused by schema resolution and trying to manage them, i.e. why have a user pay for a setting they aren't using with unnecessary performance? I can think about simplification and I'm absolutely open to any ideas. One thing I considered was having separate builders for the `Reader` and `Decoder`, but that didn't align with the pattern followed by `arrow-json` and `arrow-csv`. > Meanwhile, I think we always have to compute the fingerprint for the writer schema? It would be weird not to cache it on schema change just because the user forgot to supply a fingerprint. And if the user did supply a fingerprint, we have to verify it's correct (so I'm not sure why it's helpful for them to provide it in the first place)? The idea was the caller would be responsible for registering all their `writer_schemas` in the `SchemaStore` and setting the `reader_schema`. If there's a `Schema` in particular which they knew would be used initially, they could simply grab the `Fingerprint` from the `register` call and use that in `.with_active_fingerprint()`. Doing that ensures for them: 1. The first Avro record doesn't need to be a single object encoding with the fingerprint. 2. Their `RecorderDecoder` is already initialized and ready to go. NOTE: This only applies to streaming decoding. When decoding an Avro file the `writer_schema` is in the `Header`. A `reader_schema` still needs to be set for schema resolution to occur. > Also: How do static_store_mode and schema_store parameters relate to each other? > I got a bit lost trying to follow the logic that uses them. `static_store_mode ` allows the caller to set a `writer_schema_store` while statically staying on the same `writer_schema`. The idea was to let the caller still get schema resolution in a streaming decoder without them having to pay with the two byte lookups on each record decoding. If you need to change `writer_schema` with `static_store_mode=true`, then you'd have to rebuild the `Decoder` with a different `active_fingerprint`. For example, if I had the active `writer_schema` and I knew it would never change and that I wouldn't get a single object encoding referencing another schema, then why pay for the 2 byte peek overhead? That was the intention atleast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org