adamreeve commented on code in PR #7818:
URL: https://github.com/apache/arrow-rs/pull/7818#discussion_r2173944156


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -789,44 +790,54 @@ impl ArrowRowGroupWriter {
     }
 }
 
-struct ArrowRowGroupWriterFactory {
+/// Factory for creating [`ArrowRowGroupWriter`] instances.
+/// This is used by [`ArrowWriter`] to create row group writers, but can be 
used
+/// directly for lower level API.
+pub struct ArrowRowGroupWriterFactory {
     #[cfg(feature = "encryption")]
     file_encryptor: Option<Arc<FileEncryptor>>,
 }
 
 impl ArrowRowGroupWriterFactory {
+    /// Creates a new [`ArrowRowGroupWriterFactory`] using provided 
[`SerializedFileWriter`].
     #[cfg(feature = "encryption")]
-    fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
+    pub fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self 
{
         Self {
             file_encryptor: file_writer.file_encryptor(),
         }
     }
 
     #[cfg(not(feature = "encryption"))]
-    fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
+    pub fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> 
Self {
         Self {}
     }
 
+    /// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and 
writer properties.
     #[cfg(feature = "encryption")]
-    fn create_row_group_writer(
+    pub fn create_row_group_writer(
         &self,
         parquet: &SchemaDescriptor,
         props: &WriterPropertiesPtr,
         arrow: &SchemaRef,
         row_group_index: usize,
     ) -> Result<ArrowRowGroupWriter> {
-        let writers = get_column_writers_with_encryptor(
-            parquet,
-            props,
-            arrow,
-            self.file_encryptor.clone(),
-            row_group_index,
-        )?;
+        let mut writers = Vec::with_capacity(arrow.fields.len());

Review Comment:
   Why is this changed? It doesn't look different to 
`get_column_writers_with_encryptor`



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -141,7 +141,7 @@ pub struct ArrowWriter<W: Write> {
     arrow_schema: SchemaRef,
 
     /// Creates new [`ArrowRowGroupWriter`] instances as required
-    row_group_writer_factory: ArrowRowGroupWriterFactory,
+    pub row_group_writer_factory: ArrowRowGroupWriterFactory,

Review Comment:
   Why is this `pub`?



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -883,14 +894,21 @@ fn get_column_writers_with_encryptor(
 }
 
 /// Gets [`ArrowColumnWriter`] instances for different data types
-struct ArrowColumnWriterFactory {
+pub struct ArrowColumnWriterFactory {

Review Comment:
   Why is this pub?



##########
parquet/src/encryption/encrypt.rs:
##########
@@ -288,14 +288,14 @@ impl EncryptionPropertiesBuilder {
 
 #[derive(Debug)]
 /// The encryption configuration for a single Parquet file
-pub(crate) struct FileEncryptor {
+pub struct FileEncryptor {

Review Comment:
   ?



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -860,7 +871,7 @@ pub fn get_column_writers(
 
 /// Returns the [`ArrowColumnWriter`] for a given schema and supports columnar 
encryption
 #[cfg(feature = "encryption")]
-fn get_column_writers_with_encryptor(
+pub fn get_column_writers_with_encryptor(

Review Comment:
   Why is this pub?



##########
parquet/tests/encryption/encryption.rs:
##########
@@ -1101,3 +1111,109 @@ fn read_and_roundtrip_to_encrypted_file(
     // check re-written example data
     verify_encryption_test_file_read(temp_file, decryption_properties);
 }
+
+#[tokio::test]
+async fn test_multi_threaded_encrypted_writing() {
+    // Read example data and set up encryption/decryption properties
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+
+    let (record_batches, metadata) =
+        read_encrypted_file(&path, decryption_properties.clone()).unwrap();
+    let to_write: Vec<_> = record_batches
+        .iter()
+        .flat_map(|rb| rb.columns().to_vec())
+        .collect();
+    let schema = metadata.schema().clone();
+
+    let props = Arc::new(
+        WriterPropertiesBuilder::with_defaults()
+            .with_file_encryption_properties(file_encryption_properties)
+            .build(),
+    );
+    let parquet_schema = ArrowSchemaConverter::new()
+        .with_coerce_types(props.coerce_types())
+        .convert(&schema)
+        .unwrap();
+    let root_schema = parquet_schema.root_schema_ptr();
+
+    // Create a temporary file to write the encrypted data
+    let temp_file = tempfile::NamedTempFile::new().unwrap();
+    let mut file_writer =
+        SerializedFileWriter::new(&temp_file, root_schema.clone(), 
props.clone()).unwrap();
+
+    let arrow_row_group_writer_factory = 
ArrowRowGroupWriterFactory::new(&file_writer);
+    let arrow_row_group_writer = arrow_row_group_writer_factory
+        .create_row_group_writer(&parquet_schema, &props.clone(), &schema, 0)

Review Comment:
   ```suggestion
           .create_row_group_writer(&parquet_schema, &props, &schema, 0)
   ```



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -789,44 +790,54 @@ impl ArrowRowGroupWriter {
     }
 }
 
-struct ArrowRowGroupWriterFactory {
+/// Factory for creating [`ArrowRowGroupWriter`] instances.
+/// This is used by [`ArrowWriter`] to create row group writers, but can be 
used
+/// directly for lower level API.
+pub struct ArrowRowGroupWriterFactory {
     #[cfg(feature = "encryption")]
     file_encryptor: Option<Arc<FileEncryptor>>,
 }
 
 impl ArrowRowGroupWriterFactory {
+    /// Creates a new [`ArrowRowGroupWriterFactory`] using provided 
[`SerializedFileWriter`].
     #[cfg(feature = "encryption")]
-    fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self {
+    pub fn new<W: Write + Send>(file_writer: &SerializedFileWriter<W>) -> Self 
{
         Self {
             file_encryptor: file_writer.file_encryptor(),
         }
     }
 
     #[cfg(not(feature = "encryption"))]
-    fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> Self {
+    pub fn new<W: Write + Send>(_file_writer: &SerializedFileWriter<W>) -> 
Self {
         Self {}
     }
 
+    /// Creates a new [`ArrowRowGroupWriter`] for the given parquet schema and 
writer properties.
     #[cfg(feature = "encryption")]
-    fn create_row_group_writer(
+    pub fn create_row_group_writer(

Review Comment:
   Rather than have to pass the `SchemaDescriptor`,  `WriterPropertiesPtr` and 
`SchemaRef` each time we create a new row group writer, this should probably be 
refactored so we store those in the `ArrowRowGroupWriterFactory` and set them 
in `new`. As those shouldn't be allowed to change between row groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to