adamreeve commented on code in PR #7818:
URL: https://github.com/apache/arrow-rs/pull/7818#discussion_r2178885296


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -755,8 +758,9 @@ impl ArrowColumnWriter {
 }
 
 /// Encodes [`RecordBatch`] to a parquet row group
-struct ArrowRowGroupWriter {
-    writers: Vec<ArrowColumnWriter>,
+pub struct ArrowRowGroupWriter {
+    /// [`ArrowColumnWriter`] for each column in a row group
+    pub writers: Vec<ArrowColumnWriter>,

Review Comment:
   Rather than exposing this as a public member, it might be cleaner to add an 
`into_inner` or `into_column_writers` method that consumes the 
`ArrowRowGroupWriter` and returns the column writers. Then it's clearer that 
you probably shouldn't mix writing directly to the column writers with using 
the other methods on the `ArrowRowGroupWriter`.



##########
parquet/tests/encryption/encryption.rs:
##########
@@ -1101,3 +1109,117 @@ fn read_and_roundtrip_to_encrypted_file(
     // check re-written example data
     verify_encryption_test_file_read(temp_file, decryption_properties);
 }
+
+#[tokio::test]
+async fn test_multi_threaded_encrypted_writing() {
+    // Read example data and set up encryption/decryption properties
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+
+    let (record_batches, metadata) =
+        read_encrypted_file(&path, decryption_properties.clone()).unwrap();
+    let to_write: Vec<_> = record_batches
+        .iter()
+        .flat_map(|rb| rb.columns().to_vec())
+        .collect();
+    let schema = metadata.schema().clone();
+
+    let props = Arc::new(
+        WriterPropertiesBuilder::with_defaults()
+            .with_file_encryption_properties(file_encryption_properties)
+            .build(),
+    );
+    let parquet_schema = ArrowSchemaConverter::new()
+        .with_coerce_types(props.coerce_types())
+        .convert(&schema)
+        .unwrap();
+    let root_schema = parquet_schema.root_schema_ptr();
+
+    // Create a temporary file to write the encrypted data
+    let temp_file = tempfile::NamedTempFile::new().unwrap();
+    let mut file_writer =
+        SerializedFileWriter::new(&temp_file, root_schema.clone(), 
props.clone()).unwrap();
+
+    let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new(
+        &file_writer,
+        parquet_schema,
+        schema.clone(),
+        props.clone(),
+    );
+    let arrow_row_group_writer = arrow_row_group_writer_factory
+        .create_row_group_writer(0)
+        .unwrap();
+
+    // Get column writers with encryptor from ArrowRowGroupWriter
+    let col_writers = arrow_row_group_writer.writers;
+    let num_columns = col_writers.len();
+
+    // Create a channel for each column writer to send ArrowLeafColumn data to
+    let mut col_writer_tasks = Vec::with_capacity(num_columns);
+    let mut col_array_channels = Vec::with_capacity(num_columns);
+    for mut writer in col_writers.into_iter() {
+        let (send_array, mut receive_array) = 
tokio::sync::mpsc::channel::<ArrowLeafColumn>(100);
+        col_array_channels.push(send_array);
+        let handle = tokio::spawn(async move {
+            while let Some(col) = receive_array.recv().await {
+                let _ = writer.write(&col);
+            }
+            writer.close().unwrap()
+        });
+        col_writer_tasks.push(handle);
+    }
+
+    // Send the ArrowLeafColumn data to the respective column writer channels
+    for (channel_idx, (array, field)) in 
to_write.iter().zip(schema.fields()).enumerate() {
+        for c in compute_leaves(field, array).into_iter().flatten() {
+            let _ = col_array_channels[channel_idx].send(c).await;
+        }
+    }

Review Comment:
   This doesn't look like the correct logic if you have a nested schema, eg. 
with struct typed columns. I guess it works fine in this test but should be 
corrected in case someone tries to use it as an example.
   
   Above, one channel for each leaf level column writer is created, but then 
`col_array_channels` is indexed using `channel_idx`, which is the index of the 
top level Arrow schema field.
   
   I'd expect this to look a bit more like the example 
[here](https://github.com/apache/arrow-rs/blob/52ad7d703acff0e5b4c143c179df206a842bf24e/parquet/src/arrow/arrow_writer/mod.rs#L662-L668).



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -199,9 +199,14 @@ impl<W: Write + Send> ArrowWriter<W> {
         let max_row_group_size = props.max_row_group_size();
 
         let file_writer =
-            SerializedFileWriter::new(writer, schema.root_schema_ptr(), 
Arc::new(props))?;
+            SerializedFileWriter::new(writer, schema.root_schema_ptr(), 
Arc::new(props.clone()))?;

Review Comment:
   We can avoid cloning all the properties and just clone the Arc, eg:
   ```rust
   let props_ptr = Arc::new(props);
   let file_writer =
               SerializedFileWriter::new(writer, schema.root_schema_ptr(), 
Arc::clone(&props_ptr))?;
   ```
   (then change `props.into()` below to `props_ptr`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to