This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 1eb62bd391 Unify API for writing column chunks / row groups in 
parallel (#8582)
1eb62bd391 is described below

commit 1eb62bd39119a003d5b240108055e3a6697a1cfd
Author: Adam Reeve <[email protected]>
AuthorDate: Wed Oct 15 08:51:44 2025 +1300

    Unify API for writing column chunks / row groups in parallel (#8582)
    
    # Which issue does this PR close?
    
    - Closes #8389.
    
    # Rationale for this change
    
    Simplify API surface and only provide one way to write column chunks and
    row groups in parallel.
    
    # What changes are included in this PR?
    
    * Make `ArrowRowGroupWriterFactory` constructor public and simplify it
    to remove arguments that are available from the `SerializedFileWriter`.
    * Update `ArrowColumnWriter` example and test code to use the
    `ArrowRowGroupWriterFactory`.
    * Deprecate `parquet::arrow::arrow_writer::get_column_writers` and
    `parquet::arrow::arrow_writer::ArrowWriter::into_serialized_writer`
    
    # Are these changes tested?
    
    Yes, covered by existing tests.
    
    # Are there any user-facing changes?
    
    Yes, this deprecates existing public methods.
---
 parquet/benches/arrow_writer.rs              |  14 ++-
 parquet/src/arrow/arrow_writer/mod.rs        | 130 ++++++++++++---------------
 parquet/src/file/writer.rs                   |   6 ++
 parquet/tests/encryption/encryption_async.rs |  36 +++++---
 4 files changed, 96 insertions(+), 90 deletions(-)

diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs
index 6053e02466..b92f0788b2 100644
--- a/parquet/benches/arrow_writer.rs
+++ b/parquet/benches/arrow_writer.rs
@@ -19,7 +19,7 @@
 extern crate criterion;
 
 use criterion::{Bencher, Criterion, Throughput};
-use parquet::arrow::arrow_writer::compute_leaves;
+use parquet::arrow::arrow_writer::{ArrowRowGroupWriterFactory, compute_leaves};
 use parquet::basic::{Compression, ZstdLevel};
 
 extern crate arrow;
@@ -33,8 +33,10 @@ use arrow::datatypes::*;
 use arrow::util::bench_util::{create_f16_array, create_f32_array, 
create_f64_array};
 use arrow::{record_batch::RecordBatch, util::data_gen::*};
 use arrow_array::RecordBatchOptions;
+use parquet::arrow::ArrowSchemaConverter;
+use parquet::errors::Result;
 use parquet::file::properties::{WriterProperties, WriterVersion};
-use parquet::{arrow::ArrowWriter, errors::Result};
+use parquet::file::writer::SerializedFileWriter;
 
 fn create_primitive_bench_batch(
     size: usize,
@@ -341,8 +343,12 @@ fn write_batch_with_option(
     props: Option<WriterProperties>,
 ) -> Result<()> {
     let mut file = Empty::default();
-    let writer = ArrowWriter::try_new(&mut file, batch.schema(), props)?;
-    let (_, row_group_writer_factory) = writer.into_serialized_writer()?;
+    let props = Arc::new(props.unwrap_or_default());
+    let parquet_schema = ArrowSchemaConverter::new()
+        .with_coerce_types(props.coerce_types())
+        .convert(batch.schema_ref())?;
+    let writer = SerializedFileWriter::new(&mut file, 
parquet_schema.root_schema_ptr(), props)?;
+    let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, 
batch.schema());
 
     bench.iter(|| {
         let mut row_group = 
row_group_writer_factory.create_column_writers(0).unwrap();
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 66801d2d38..c2a7a6376f 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -48,7 +48,7 @@ use crate::file::properties::{WriterProperties, 
WriterPropertiesPtr};
 use crate::file::reader::{ChunkReader, Length};
 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
 use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
-use crate::schema::types::{ColumnDescPtr, SchemaDescriptor};
+use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
 use levels::{ArrayLevels, calculate_array_levels};
 
 mod byte_array;
@@ -252,7 +252,7 @@ impl<W: Write + Send> ArrowWriter<W> {
             SerializedFileWriter::new(writer, schema.root_schema_ptr(), 
Arc::clone(&props_ptr))?;
 
         let row_group_writer_factory =
-            ArrowRowGroupWriterFactory::new(&file_writer, schema, 
arrow_schema.clone(), props_ptr);
+            ArrowRowGroupWriterFactory::new(&file_writer, 
arrow_schema.clone());
 
         Ok(Self {
             writer: file_writer,
@@ -423,7 +423,10 @@ impl<W: Write + Send> ArrowWriter<W> {
     }
 
     /// Create a new row group writer and return its column writers.
-    #[deprecated(since = "56.2.0", note = "Use into_serialized_writer 
instead")]
+    #[deprecated(
+        since = "56.2.0",
+        note = "Use `ArrowRowGroupWriterFactory` instead, see 
`ArrowColumnWriter` for an example"
+    )]
     pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
         self.flush()?;
         let in_progress = self
@@ -433,7 +436,10 @@ impl<W: Write + Send> ArrowWriter<W> {
     }
 
     /// Append the given column chunks to the file as a new row group.
-    #[deprecated(since = "56.2.0", note = "Use into_serialized_writer 
instead")]
+    #[deprecated(
+        since = "56.2.0",
+        note = "Use `SerializedFileWriter` directly instead, see 
`ArrowColumnWriter` for an example"
+    )]
     pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> 
Result<()> {
         let mut row_group_writer = self.writer.next_row_group()?;
         for chunk in chunks {
@@ -445,6 +451,10 @@ impl<W: Write + Send> ArrowWriter<W> {
 
     /// Converts this writer into a lower-level [`SerializedFileWriter`] and 
[`ArrowRowGroupWriterFactory`].
     /// This can be useful to provide more control over how files are written.
+    #[deprecated(
+        since = "57.0.0",
+        note = "Construct a `SerializedFileWriter` and 
`ArrowRowGroupWriterFactory` directly instead"
+    )]
     pub fn into_serialized_writer(
         mut self,
     ) -> Result<(SerializedFileWriter<W>, ArrowRowGroupWriterFactory)> {
@@ -693,6 +703,8 @@ impl ArrowColumnChunk {
 
 /// Encodes [`ArrowLeafColumn`] to [`ArrowColumnChunk`]
 ///
+/// `ArrowColumnWriter` instances can be created using an 
[`ArrowRowGroupWriterFactory`];
+///
 /// Note: This is a low-level interface for applications that require
 /// fine-grained control of encoding (e.g. encoding using multiple threads),
 /// see [`ArrowWriter`] for a higher-level interface
@@ -704,7 +716,7 @@ impl ArrowColumnChunk {
 /// # use arrow_array::*;
 /// # use arrow_schema::*;
 /// # use parquet::arrow::ArrowSchemaConverter;
-/// # use parquet::arrow::arrow_writer::{ArrowLeafColumn, compute_leaves, 
get_column_writers, ArrowColumnChunk};
+/// # use parquet::arrow::arrow_writer::{compute_leaves, ArrowColumnChunk, 
ArrowLeafColumn, ArrowRowGroupWriterFactory};
 /// # use parquet::file::properties::WriterProperties;
 /// # use parquet::file::writer::{SerializedFileWriter, 
SerializedRowGroupWriter};
 /// #
@@ -720,8 +732,17 @@ impl ArrowColumnChunk {
 ///   .convert(&schema)
 ///   .unwrap();
 ///
-/// // Create writers for each of the leaf columns
-/// let col_writers = get_column_writers(&parquet_schema, &props, 
&schema).unwrap();
+/// // Create parquet writer
+/// let root_schema = parquet_schema.root_schema_ptr();
+/// // write to memory in the example, but this could be a File
+/// let mut out = Vec::with_capacity(1024);
+/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone())
+///   .unwrap();
+///
+/// // Create a factory for building Arrow column writers
+/// let row_group_factory = ArrowRowGroupWriterFactory::new(&writer, 
Arc::clone(&schema));
+/// // Create column writers for the 0th row group
+/// let col_writers = row_group_factory.create_column_writers(0).unwrap();
 ///
 /// // Spawn a worker thread for each column
 /// //
@@ -744,13 +765,6 @@ impl ArrowColumnChunk {
 ///     })
 ///     .collect();
 ///
-/// // Create parquet writer
-/// let root_schema = parquet_schema.root_schema_ptr();
-/// // write to memory in the example, but this could be a File
-/// let mut out = Vec::with_capacity(1024);
-/// let mut writer = SerializedFileWriter::new(&mut out, root_schema, 
props.clone())
-///   .unwrap();
-///
 /// // Start row group
 /// let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer
 ///   .next_row_group()
@@ -894,7 +908,7 @@ impl ArrowRowGroupWriter {
 
 /// Factory that creates new column writers for each row group in the Parquet 
file.
 pub struct ArrowRowGroupWriterFactory {
-    schema: SchemaDescriptor,
+    schema: SchemaDescPtr,
     arrow_schema: SchemaRef,
     props: WriterPropertiesPtr,
     #[cfg(feature = "encryption")]
@@ -902,61 +916,57 @@ pub struct ArrowRowGroupWriterFactory {
 }
 
 impl ArrowRowGroupWriterFactory {
-    #[cfg(feature = "encryption")]
-    fn new<W: Write + Send>(
+    /// Create a new [`ArrowRowGroupWriterFactory`] for the provided file 
writer and Arrow schema
+    pub fn new<W: Write + Send>(
         file_writer: &SerializedFileWriter<W>,
-        schema: SchemaDescriptor,
         arrow_schema: SchemaRef,
-        props: WriterPropertiesPtr,
     ) -> Self {
+        let schema = Arc::clone(file_writer.schema_descr_ptr());
+        let props = Arc::clone(file_writer.properties());
         Self {
             schema,
             arrow_schema,
             props,
+            #[cfg(feature = "encryption")]
             file_encryptor: file_writer.file_encryptor(),
         }
     }
 
-    #[cfg(not(feature = "encryption"))]
-    fn new<W: Write + Send>(
-        _file_writer: &SerializedFileWriter<W>,
-        schema: SchemaDescriptor,
-        arrow_schema: SchemaRef,
-        props: WriterPropertiesPtr,
-    ) -> Self {
-        Self {
-            schema,
-            arrow_schema,
-            props,
+    fn create_row_group_writer(&self, row_group_index: usize) -> 
Result<ArrowRowGroupWriter> {
+        let writers = self.create_column_writers(row_group_index)?;
+        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
+    }
+
+    /// Create column writers for a new row group.
+    pub fn create_column_writers(&self, row_group_index: usize) -> 
Result<Vec<ArrowColumnWriter>> {
+        let mut writers = Vec::with_capacity(self.arrow_schema.fields.len());
+        let mut leaves = self.schema.columns().iter();
+        let column_factory = self.column_writer_factory(row_group_index);
+        for field in &self.arrow_schema.fields {
+            column_factory.get_arrow_column_writer(
+                field.data_type(),
+                &self.props,
+                &mut leaves,
+                &mut writers,
+            )?;
         }
+        Ok(writers)
     }
 
     #[cfg(feature = "encryption")]
-    fn create_row_group_writer(&self, row_group_index: usize) -> 
Result<ArrowRowGroupWriter> {
-        let writers = get_column_writers_with_encryptor(
-            &self.schema,
-            &self.props,
-            &self.arrow_schema,
-            self.file_encryptor.clone(),
-            row_group_index,
-        )?;
-        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
+    fn column_writer_factory(&self, row_group_idx: usize) -> 
ArrowColumnWriterFactory {
+        ArrowColumnWriterFactory::new()
+            .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
     }
 
     #[cfg(not(feature = "encryption"))]
-    fn create_row_group_writer(&self, _row_group_index: usize) -> 
Result<ArrowRowGroupWriter> {
-        let writers = get_column_writers(&self.schema, &self.props, 
&self.arrow_schema)?;
-        Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
-    }
-
-    /// Create column writers for a new row group.
-    pub fn create_column_writers(&self, row_group_index: usize) -> 
Result<Vec<ArrowColumnWriter>> {
-        let rg_writer = self.create_row_group_writer(row_group_index)?;
-        Ok(rg_writer.writers)
+    fn column_writer_factory(&self, _row_group_idx: usize) -> 
ArrowColumnWriterFactory {
+        ArrowColumnWriterFactory::new()
     }
 }
 
 /// Returns [`ArrowColumnWriter`]s for each column in a given schema
+#[deprecated(since = "57.0.0", note = "Use `ArrowRowGroupWriterFactory` 
instead")]
 pub fn get_column_writers(
     parquet: &SchemaDescriptor,
     props: &WriterPropertiesPtr,
@@ -976,30 +986,6 @@ pub fn get_column_writers(
     Ok(writers)
 }
 
-/// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar 
encryption
-#[cfg(feature = "encryption")]
-fn get_column_writers_with_encryptor(
-    parquet: &SchemaDescriptor,
-    props: &WriterPropertiesPtr,
-    arrow: &SchemaRef,
-    file_encryptor: Option<Arc<FileEncryptor>>,
-    row_group_index: usize,
-) -> Result<Vec<ArrowColumnWriter>> {
-    let mut writers = Vec::with_capacity(arrow.fields.len());
-    let mut leaves = parquet.columns().iter();
-    let column_factory =
-        ArrowColumnWriterFactory::new().with_file_encryptor(row_group_index, 
file_encryptor);
-    for field in &arrow.fields {
-        column_factory.get_arrow_column_writer(
-            field.data_type(),
-            props,
-            &mut leaves,
-            &mut writers,
-        )?;
-    }
-    Ok(writers)
-}
-
 /// Creates [`ArrowColumnWriter`] instances
 struct ArrowColumnWriterFactory {
     #[cfg(feature = "encryption")]
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 4533d25401..a77ce266d1 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -392,6 +392,12 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         &self.descr
     }
 
+    /// Returns a reference to schema descriptor Arc.
+    #[cfg(feature = "arrow")]
+    pub(crate) fn schema_descr_ptr(&self) -> &SchemaDescPtr {
+        &self.descr
+    }
+
     /// Returns a reference to the writer properties
     pub fn properties(&self) -> &WriterPropertiesPtr {
         &self.props
diff --git a/parquet/tests/encryption/encryption_async.rs 
b/parquet/tests/encryption/encryption_async.rs
index 6da3d2d11e..ccbb2b0bff 100644
--- a/parquet/tests/encryption/encryption_async.rs
+++ b/parquet/tests/encryption/encryption_async.rs
@@ -24,13 +24,14 @@ use crate::encryption_util::{
 use arrow_array::RecordBatch;
 use arrow_schema::Schema;
 use futures::TryStreamExt;
-use parquet::arrow::ParquetRecordBatchStreamBuilder;
 use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
 use parquet::arrow::arrow_writer::{
     ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn, 
ArrowRowGroupWriterFactory,
     ArrowWriterOptions, compute_leaves,
 };
-use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
+use parquet::arrow::{
+    ArrowSchemaConverter, ArrowWriter, AsyncArrowWriter, 
ParquetRecordBatchStreamBuilder,
+};
 use parquet::encryption::decrypt::FileDecryptionProperties;
 use parquet::encryption::encrypt::FileEncryptionProperties;
 use parquet::errors::ParquetError;
@@ -696,18 +697,22 @@ async fn 
test_concurrent_encrypted_writing_over_multiple_row_groups() {
         }
     });
 
-    let props = Some(
+    let props = Arc::new(
         WriterPropertiesBuilder::default()
             .with_file_encryption_properties(file_encryption_properties)
             .build(),
     );
+    let parquet_schema = ArrowSchemaConverter::new()
+        .with_coerce_types(props.coerce_types())
+        .convert(schema)
+        .unwrap();
 
     // Create a temporary file to write the encrypted data
     let temp_file = tempfile::tempfile().unwrap();
-    let arrow_writer =
-        ArrowWriter::try_new(&temp_file, metadata.schema().clone(), 
props.clone()).unwrap();
 
-    let (writer, row_group_writer_factory) = 
arrow_writer.into_serialized_writer().unwrap();
+    let writer =
+        SerializedFileWriter::new(&temp_file, 
parquet_schema.root_schema_ptr(), props).unwrap();
+    let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, 
Arc::clone(schema));
     let max_row_groups = 1;
 
     let (serialize_tx, serialize_rx) =
@@ -757,19 +762,22 @@ async fn test_multi_threaded_encrypted_writing() {
         read_encrypted_file(&file, decryption_properties.clone()).unwrap();
     let schema = metadata.schema().clone();
 
-    let props = Some(
+    let props = Arc::new(
         WriterPropertiesBuilder::default()
             .with_file_encryption_properties(file_encryption_properties)
             .build(),
     );
 
+    let parquet_schema = ArrowSchemaConverter::new()
+        .with_coerce_types(props.coerce_types())
+        .convert(&schema)
+        .unwrap();
+
     // Create a temporary file to write the encrypted data
     let temp_file = tempfile::tempfile().unwrap();
-    let writer =
-        ArrowWriter::try_new(&temp_file, metadata.schema().clone(), 
props.clone()).unwrap();
-
-    let (mut serialized_file_writer, row_group_writer_factory) =
-        writer.into_serialized_writer().unwrap();
+    let mut writer =
+        SerializedFileWriter::new(&temp_file, 
parquet_schema.root_schema_ptr(), props).unwrap();
+    let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, 
Arc::clone(&schema));
 
     let (serialize_tx, mut serialize_rx) =
         tokio::sync::mpsc::channel::<JoinHandle<RBStreamSerializeResult>>(1);
@@ -805,7 +813,7 @@ async fn test_multi_threaded_encrypted_writing() {
     // Append the finalized row groups to the SerializedFileWriter
     while let Some(task) = serialize_rx.recv().await {
         let (arrow_column_chunks, _) = task.await.unwrap().unwrap();
-        let mut row_group_writer = 
serialized_file_writer.next_row_group().unwrap();
+        let mut row_group_writer = writer.next_row_group().unwrap();
         for chunk in arrow_column_chunks {
             chunk.append_to_row_group(&mut row_group_writer).unwrap();
         }
@@ -815,7 +823,7 @@ async fn test_multi_threaded_encrypted_writing() {
     // Wait for data generator and serialization task to finish
     data_generator.await.unwrap();
     launch_serialization_task.await.unwrap();
-    let metadata = serialized_file_writer.close().unwrap();
+    let metadata = writer.close().unwrap();
 
     // Close the file writer which writes the footer
     assert_eq!(metadata.file_metadata().num_rows(), 50);

Reply via email to