EmilyMatt commented on code in PR #9241: URL: https://github.com/apache/arrow-rs/pull/9241#discussion_r2750078359
########## arrow-avro/src/writer/async_writer.rs: ########## @@ -0,0 +1,562 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! `async` API for writing [`RecordBatch`]es to Avro files +//! +//! This module provides async versions of the synchronous Avro writer. +//! See [`crate::writer`] for API details on the synchronous version. +//! +//! # Example +//! +//! ```no_run +//! use std::sync::Arc; +//! use arrow_array::{ArrayRef, Int64Array, RecordBatch}; +//! use arrow_schema::{DataType, Field, Schema}; +//! use arrow_avro::writer::AsyncAvroWriter; +//! +//! # #[tokio::main(flavor = "current_thread")] +//! # async fn main() -> Result<(), Box<dyn std::error::Error>> { +//! let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]); +//! let batch = RecordBatch::try_new( +//! Arc::new(schema.clone()), +//! vec![Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef], +//! )?; +//! +//! let mut buffer = Vec::new(); +//! let mut writer = AsyncAvroWriter::new(&mut buffer, schema).await?; +//! writer.write(&batch).await?; +//! writer.finish().await?; +//! +//! let bytes = buffer.clone(); +//! assert!(!bytes.is_empty()); +//! # Ok(()) } +//! ``` +//! +//! # Features +//! +//! - **OCF format**: Write Avro Object Container Files with schema, sync markers, and optional compression +//! - **SOE format**: Write Avro Single Object Encoding streams for registry-based workflows +//! - **Flexible sinks**: Works with any `AsyncWrite + Send` type or custom `AsyncFileWriter` implementations +//! - **Compression**: Supports all compression codecs (Deflate, Snappy, ZStandard, etc.) +//! - **Feature-gated**: Requires `async` feature to use + +use crate::compression::CompressionCodec; +use crate::schema::{AvroSchema, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY}; +use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long}; +use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat}; +use arrow_array::RecordBatch; +use arrow_schema::{ArrowError, Schema}; +use bytes::Bytes; +use futures::future::{BoxFuture, FutureExt}; +use std::sync::Arc; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +/// The asynchronous interface used by [`AsyncWriter`] to write Avro files. +/// +/// This trait allows [`AsyncWriter`] to be generic over different output destinations, +/// such as files, network sockets, or in-memory buffers. It abstracts the async write +/// operations needed to produce Avro output. +/// +/// # Semantics +/// +/// - **[`write`](Self::write)**: Writes a chunk of bytes to the underlying sink. This may be +/// called multiple times during writing. Implementations should buffer or write directly as +/// appropriate. The bytes are provided as [`Bytes`] for efficient zero-copy handling. +/// +/// - **[`complete`](Self::complete)**: Signals that writing is finished. Implementations should +/// flush any buffered data and finalize the output (e.g., close file handles). After `complete` +/// returns `Ok(())`, no further `write` calls should be made. +/// +/// # Provided Implementations +/// +/// A blanket implementation is provided for all types implementing [`AsyncWrite`] + [`Unpin`] + [`Send`], +/// which covers common types like `tokio::fs::File`, `tokio::net::TcpStream`, and `Vec<u8>`. +/// +/// For custom sinks (e.g., object stores, cloud storage), implement this trait directly. +pub trait AsyncFileWriter: Send { + /// Write the provided bytes to the underlying writer. + /// + /// This method may be called multiple times during the writing process. + /// Each call provides a chunk of the Avro output that should be written + /// to the destination. + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>>; + + /// Flush any buffered data and finish the writing process. + /// + /// This method should ensure all data is persisted to the underlying storage. + /// After `complete` returns `Ok(())`, the caller SHOULD NOT call `write` again. + fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>>; +} + +impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_> { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>> { + self.as_mut().write(bs) + } + + fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>> { + self.as_mut().complete() + } +} + +impl<T: AsyncWrite + Unpin + Send> AsyncFileWriter for T { + fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ArrowError>> { + async move { + self.write_all(&bs) + .await + .map_err(|e| ArrowError::IoError(format!("Error writing bytes: {e}"), e)) + } + .boxed() + } + + fn complete(&mut self) -> BoxFuture<'_, Result<(), ArrowError>> { + async move { + self.flush() + .await + .map_err(|e| ArrowError::IoError(format!("Error flushing: {e}"), e))?; + self.shutdown() + .await + .map_err(|e| ArrowError::IoError(format!("Error closing: {e}"), e)) + } + .boxed() + } +} + +/// Builder to configure and create an async `AsyncWriter`. +#[derive(Debug, Clone)] +pub struct AsyncWriterBuilder { + schema: Schema, + codec: Option<CompressionCodec>, + capacity: usize, + fingerprint_strategy: Option<FingerprintStrategy>, +} + +impl AsyncWriterBuilder { + /// Create a new builder with default settings. + /// + /// The Avro schema used for writing is determined as follows: + /// 1) If the Arrow schema metadata contains `avro.schema` (see `SCHEMA_METADATA_KEY`), + /// that JSON is used verbatim. + /// 2) Otherwise, the Arrow schema is converted to an Avro record schema. + /// + /// All other writer settings (compression codec, buffer capacity, fingerprint strategy) + /// are initialized to their defaults and can be customized using the corresponding + /// builder methods. + pub fn new(schema: Schema) -> Self { + Self { + schema, + codec: None, + capacity: 1024, + fingerprint_strategy: None, + } + } + + /// Set the fingerprinting strategy for the stream writer. + pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self { + self.fingerprint_strategy = Some(strategy); + self + } + + /// Change the compression codec. + pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self { + self.codec = codec; + self + } + + /// Sets the capacity for internal buffers. + pub fn with_capacity(mut self, capacity: usize) -> Self { + self.capacity = capacity; + self + } + + /// Create a new async `AsyncWriter` with specified `AvroFormat`. + pub async fn build<W, F>(self, mut writer: W) -> Result<AsyncWriter<W, F>, ArrowError> + where + W: AsyncFileWriter, + F: AvroFormat, + { + let mut format = F::default(); + let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) { + Some(json) => AvroSchema::new(json.clone()), + None => AvroSchema::try_from(&self.schema)?, + }; + let maybe_fingerprint = if F::NEEDS_PREFIX { + match self.fingerprint_strategy { + Some(FingerprintStrategy::Id(id)) => Some(crate::schema::Fingerprint::Id(id)), + Some(FingerprintStrategy::Id64(id)) => Some(crate::schema::Fingerprint::Id64(id)), + Some(strategy) => { + Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?) + } + None => Some( + avro_schema + .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?, + ), + } + } else { + None + }; + let mut md = self.schema.metadata().clone(); + md.insert( + SCHEMA_METADATA_KEY.to_string(), + avro_schema.clone().json_string, + ); + let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md)); + + // Start the stream (write header, etc.) + let mut header_buf = Vec::<u8>::with_capacity(256); + format.start_stream(&mut header_buf, &schema, self.codec)?; + if !header_buf.is_empty() { + writer.write(Bytes::from(header_buf)).await?; + } + + let avro_root = crate::codec::AvroFieldBuilder::new(&avro_schema.schema()?).build()?; + let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref()) + .with_fingerprint(maybe_fingerprint) + .build()?; + + Ok(AsyncWriter { + writer, + schema, + format, + compression: self.codec, + capacity: self.capacity, + encoder, + }) + } +} + +/// Generic async Avro writer. +/// +/// This type is generic over the output async sink (`W`) and the Avro format (`F`). +/// You'll usually use the concrete aliases: +/// +/// * **[`AsyncAvroWriter`]** for **OCF** (Object Container File) +/// * **[`AsyncAvroStreamWriter`]** for **SOE** Avro streams +pub struct AsyncWriter<W: AsyncFileWriter, F: AvroFormat> { + writer: W, + schema: Arc<Schema>, + format: F, + compression: Option<CompressionCodec>, + capacity: usize, + encoder: RecordEncoder, +} + +/// Alias for async **Object Container File** writer. +pub type AsyncAvroWriter<W> = AsyncWriter<W, AvroOcfFormat>; + +/// Alias for async **Single Object Encoding** stream writer. +pub type AsyncAvroStreamWriter<W> = AsyncWriter<W, AvroSoeFormat>; + +impl<W: AsyncFileWriter> AsyncAvroWriter<W> { + /// Create a new async Avro OCF writer. + pub async fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> { + AsyncWriterBuilder::new(schema) + .build::<W, AvroOcfFormat>(writer) + .await + } + + /// Return a reference to the 16-byte sync marker generated for this file. + pub fn sync_marker(&self) -> Option<&[u8; 16]> { + self.format.sync_marker() + } +} + +impl<W: AsyncFileWriter> AsyncAvroStreamWriter<W> { + /// Create a new async Single Object Encoding stream writer. + pub async fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> { + AsyncWriterBuilder::new(schema) + .build::<W, AvroSoeFormat>(writer) + .await + } +} + +impl<W: AsyncFileWriter, F: AvroFormat> AsyncWriter<W, F> { + /// Write a single [`RecordBatch`]. + pub async fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { + if batch.schema().fields() != self.schema.fields() { + return Err(ArrowError::SchemaError( + "Schema of RecordBatch differs from Writer schema".to_string(), + )); + } + + match self.format.sync_marker() { + Some(&sync) => self.write_ocf_block(batch, &sync).await, Review Comment: I worry that this is really inefficient, writing a full block for a single batch means extremely small blocks, this should be done per size configuration. Then after writing the batch we can see if we have enough bytes in the block, then write it again, currently this will make both compression and decoding inefficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
