adamreeve commented on code in PR #7818: URL: https://github.com/apache/arrow-rs/pull/7818#discussion_r2188109111
########## parquet/tests/encryption/encryption.rs: ########## @@ -1101,3 +1109,117 @@ fn read_and_roundtrip_to_encrypted_file( // check re-written example data verify_encryption_test_file_read(temp_file, decryption_properties); } + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + let to_write: Vec<_> = record_batches + .iter() + .flat_map(|rb| rb.columns().to_vec()) + .collect(); + let schema = metadata.schema().clone(); + + let props = Arc::new( + WriterPropertiesBuilder::with_defaults() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(&schema) + .unwrap(); + let root_schema = parquet_schema.root_schema_ptr(); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::NamedTempFile::new().unwrap(); + let mut file_writer = + SerializedFileWriter::new(&temp_file, root_schema.clone(), props.clone()).unwrap(); + + let arrow_row_group_writer_factory = ArrowRowGroupWriterFactory::new( + &file_writer, + parquet_schema, + schema.clone(), + props.clone(), + ); + let arrow_row_group_writer = arrow_row_group_writer_factory + .create_row_group_writer(0) + .unwrap(); + + // Get column writers with encryptor from ArrowRowGroupWriter + let col_writers = arrow_row_group_writer.writers; + let num_columns = col_writers.len(); + + // Create a channel for each column writer to send ArrowLeafColumn data to + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut writer in col_writers.into_iter() { + let (send_array, mut receive_array) = tokio::sync::mpsc::channel::<ArrowLeafColumn>(100); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + let _ = writer.write(&col); + } + writer.close().unwrap() + }); + col_writer_tasks.push(handle); + } + + // Send the ArrowLeafColumn data to the respective column writer channels + for (channel_idx, (array, field)) in to_write.iter().zip(schema.fields()).enumerate() { + for c in compute_leaves(field, array).into_iter().flatten() { + let _ = col_array_channels[channel_idx].send(c).await; + } + } Review Comment: I don't think this needs an extra test case given the implementation doesn't do anything to specifically handle struct types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org